DelayedQueue
is a library to enqueue and later handle immutable events. It is written in java
on top of reactorand lettuce.
Redis is the only available storage engine. Queue doesn't provide durability guarantees but in pair with clusterized redis
installation
it is suitable for many use cases.
Key features are:
- event handling is retriable at increased intervals between attempts
- subscription context could be passed to a handler
- support for the
@PreDestroy
lifecycle annotation - sending metrics using Micrometer
DelayedQueue
are highly opinionated (hence customizable), with very little configuration needed to start using it.
If you want more control consider using Netflix's dyno-queue.
import static com.github.fred84.queue.DelayedEventService.delayedEventService;
var eventService = delayedEventService().client(redisClient).build();
import static com.github.fred84.queue.DelayedEventService.delayedEventService;
eventService = delayedEventService()
.client(redisClient)
.mapper(objectMapper)
.handlerScheduler(Schedulers.fromExecutorService(executor))
.schedulingInterval(Duration.ofSeconds(1))
.schedulingBatchSize(SCHEDULING_BATCH_SIZE)
.enableScheduling(false)
.pollingTimeout(POLLING_TIMEOUT)
.eventContextHandler(new DefaultEventContextHandler())
.dataSetPrefix("")
.retryAttempts(10)
.metrics(new NoopMetrics())
.refreshSubscriptionsInterval(Duration.ofMinutes(5))
.build();
eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();
eventService.close();
eventService.addHandler(
DummyEvent.class,
e -> Mono
.subscriberContext()
.doOnNext(ctx -> {
Map<String, String> eventContext = ctx.get("eventContext");
log.info("context key {}", eventContext.get("key"));
})
.thenReturn(true),
1
);
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();
Contributions are welcome. Just create a pull request.