Skip to content

Commit

Permalink
Tool to replay events
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Aug 2, 2024
1 parent 4aa30b0 commit e96f0d4
Showing 1 changed file with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import scala.util.hashing.MurmurHash3$;

import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.BiFunction;
import java.util.function.Function;


Expand Down Expand Up @@ -61,20 +62,33 @@ default Mono<List<EventEnvelope<E, Meta, Context>>> markAsPublished(List<EventEn
*
* @param fromSequenceNum sequence num to start with
* @param parallelism concurrent factor
* @param maxEventsToHandle limit to n events
* @param pageSize limit to n events
* @param handle the handling fonction for example to build a new projection
* @return the last sequence num handled
*/
default Mono<Long> concurrentReplay(Long fromSequenceNum, Integer parallelism, Option<Integer> maxEventsToHandle, Function<Flux<EventEnvelope<E, Meta, Context>>, Mono<Tuple0>> handle) {
default Mono<Long> concurrentReplayPage(Long fromSequenceNum, Integer parallelism, Integer pageSize, BiFunction<Integer, Flux<EventEnvelope<E, Meta, Context>>, Mono<Tuple0>> handle) {
LongAccumulator lastSeqNum = new LongAccumulator(Long::max, 0);
EventStore.Query.Builder tmpQuery = EventStore.Query.builder().withSequenceFrom(fromSequenceNum);
return this.loadEventsByQuery(maxEventsToHandle.fold(() -> tmpQuery, tmpQuery::withSize).build())
.groupBy(evt -> MurmurHash3$.MODULE$.stringHash(evt.entityId) % parallelism)
.flatMap(flux -> handle.apply(flux.doOnNext(evt -> lastSeqNum.accumulate(evt.sequenceNum))), parallelism)
return this.loadEventsByQuery(EventStore.Query.builder().withSequenceFrom(fromSequenceNum).withSize(pageSize).build())
.groupBy(evt -> Math.abs(MurmurHash3$.MODULE$.stringHash(evt.entityId)) % parallelism)
.flatMap(flux -> handle.apply(flux.key(), flux.doOnNext(evt -> lastSeqNum.accumulate(evt.sequenceNum))), parallelism)
.last()
.map(any -> lastSeqNum.get());
}

/**
* Stream all elements from journal and execute an handling function concurrently.
* The function shard by entity id, so event for the same entity won't be handled concurrently.
*
* @param parallelism concurrent factor
* @param pageSize number of event in memory
* @param handle the handling fonction for example to build a new projection
* @return the last sequence num handled
*/
default Mono<Long> concurrentReplay(Integer parallelism, Integer pageSize, BiFunction<Integer, Flux<EventEnvelope<E, Meta, Context>>, Mono<Tuple0>> handle) {
Function<Long, Mono<Long>> run = n -> concurrentReplayPage(n, parallelism, pageSize, handle);
return run.apply(0L).expand(run).last();
}

EventStore<TxCtx, E, Meta, Context> toEventStore();

static <TxCtx, E extends Event, Meta, Context> ReactorEventStore<TxCtx, E, Meta, Context> fromEventStore(EventStore<TxCtx, E, Meta, Context> eventStore) {
Expand Down

0 comments on commit e96f0d4

Please sign in to comment.