Skip to content

Priytam/delayedQueue

Repository files navigation

Delayed queue

DelayedQueue is a library to enqueue and later handle immutable events. It is written in java on top of reactorand lettuce. Redis is the only available storage engine. Queue doesn't provide durability guarantees but in pair with clusterized redis installation it is suitable for many use cases.

Key features are:

  • event handling is retriable at increased intervals between attempts
  • subscription context could be passed to a handler
  • support for the @PreDestroy lifecycle annotation
  • sending metrics using Micrometer

DelayedQueue are highly opinionated (hence customizable), with very little configuration needed to start using it. If you want more control consider using Netflix's dyno-queue.

Examples

Minimal configuration

import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService().client(redisClient).build();

Full configuration

import static com.github.fred84.queue.DelayedEventService.delayedEventService;

eventService = delayedEventService()
        .client(redisClient)
        .mapper(objectMapper)
        .handlerScheduler(Schedulers.fromExecutorService(executor))
        .schedulingInterval(Duration.ofSeconds(1))
        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)
        .enableScheduling(false)
        .pollingTimeout(POLLING_TIMEOUT)
        .eventContextHandler(new DefaultEventContextHandler())
        .dataSetPrefix("")
        .retryAttempts(10)
        .metrics(new NoopMetrics())
        .refreshSubscriptionsInterval(Duration.ofMinutes(5))
        .build();

Add event handler

eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);

Enqueue event

eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();

Close service

eventService.close();

Event context

eventService.addHandler(
        DummyEvent.class,
        e -> Mono
            .subscriberContext()
            .doOnNext(ctx -> {
                Map<String, String> eventContext = ctx.get("eventContext");
                log.info("context key {}", eventContext.get("key"));
            })
            .thenReturn(true),
        1
);

eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();

Contribution

Contributions are welcome. Just create a pull request.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages