Skip to content


Folders and files

Last commit message
Last commit date

Latest commit


Repository files navigation


Build Status Coverage Status Maven Central GitHub license

Java high level Nakadi consumer.

What does the project already implement?

  • Automatic reconnect to Nakadi server
  • Consumer group partition offset management with Zookeeper
  • Leader election of the topic partition pro consumer group
  • Redistribution of partitions between consumer groups
  • Automatic failover between clients in consumer group
  • Automatic rebalance after new partition was added to Nakadi topic
  • OAuth2
  • Separate generic spring boot client
  • Handle failed events with SQS queue

Project structure

├── paradox-nakadi-consumer-core                        # Core implementation
├── paradox-nakadi-consumer-partitioned-zk              # Offset management with Zookeeper / Exhibitor
├── paradox-nakadi-consumer-boot                        # Spring Boot bindings
├── paradox-nakadi-consumer-example-boot                # Spring Boot usage example
├── paradox-nakadi-consumer-sqs-error-handler           # Failed event handling with Amazon SQS

Build the project and install locally

gradlew clean
gradlew build
gradlew install


Maven dependencies

    <version>see above</version>

For error handling

    <version>see above</version>

Choose your Partition Coordinator Provider

- simple     SimplePartitionCoordinator           # no persistence, no coordination
- zk         ZKLeaderConsumerPartitionCoordinator # persistence in Zookeeper, leader election,
                                                    only one consumer from consumer group pro partition topic
- zk-simple  ZKSimpleConsumerPartitionCoordinator # offset persistence in Zookeeper, no partition coordination


Provide your unique defaultConsumerName e.g. ApplicationID. Offset tracking and topic partition leader election is done pro consumer group name.

        enabled: true
        region: eu-central-1
      nakadiTokenId: nakadi-event-stream-read
      defaultConsumerName: yourConsumerGroupName
      oauth2Enabled : true
      partitionCoordinatorProvider: zk
      exhibitorPort: 8181
      eventsStreamTimeoutSeconds: 900
      eventsBatchLimit: 1

    # batch settings can be overriden per consumer
        eventsBatchLimit: 100
        eventsBatchTimeoutSeconds: 3

  enableMock: false
  startAfterCreation: true

    - tokenId:  nakadi-event-stream-read
      scopes: your scopes


Model your event

@JsonIgnoreProperties(ignoreUnknown = true)
public class Metadata {

    private UUID eid;

    private Date occurredAt;

    private String eventType;

    private Date receivedAt;

    private String flowId;

@JsonIgnoreProperties(ignoreUnknown = true)
public class OrderReceived {

    private Metadata metadata;

    private String orderNumber;

Add event handler

@NakadiHandler(eventName = "order.ORDER_RECEIVED")
public class OrderReceivedHandler implements BatchEventsHandler<OrderReceived> {

    public void onEvent(final EventTypeCursor cursor, final OrderReceived orderReceived) {
        // your code to handle the OrderReceived event

    public Class<OrderReceived> getEventClass() {
        return OrderReceived.class;

Other handlers


JSON payload with cursor and all events. The content is not interpreted and it can contain many events (determined by eventsBatchLimit). Keep alive events are not filtered and in such a case content will have only cursor object and no events.

public class Handlers {
       public RawContentHandler rawContentHandler() {
           return new RawContentHandler() {

               @NakadiHandler(eventName = EVENT_NAME, consumerName = "test-raw-content-consumer")
               public void onEvent(final EventTypeCursor cursor, final String content) {
                 // your code to handle the raw content

Posting one or more Events

curl -v -H 'Content-Type: application/json' -XPOST http://localhost:8080/event-types/order.ORDER_RECEIVED/events -d '[
    "order_number": "24873243241",
    "metadata": {
      "eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
      "occurred_at": "2016-03-15T23:47:15+01:00"
  }, {
    "order_number": "24873243242",
    "metadata": {
      "eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
      "occurred_at": "2016-03-15T23:47:16+01:00"

RawContentHandler.onEvent(cursor,content) can be invoked once with EventTypeCursor{name=order.ORDER_RECEIVED, partition=0, offset=1} and below content. As content contains 2 events the next cursor with events would be EventTypeCursor{name=order.ORDER_RECEIVED, partition=0, offset=3}


RawContentHandler.onEvent(cursor,content) invoked with keep-alive content



JSON payload of the event. Keep alive events are filtered and the handler is not invoked

@NakadiHandler(eventName = EVENT_NAME)
public static class MyRawEventHandler implements RawEventHandler {

    public void onEvent(final EventTypeCursor cursor, final String content) {
      // your code to handle the raw event  

Posting one or more Events

curl -v -H 'Content-Type: application/json' -XPOST http://localhost:8080/event-types/order.ORDER_RECEIVED/events -d '[
    "order_number": "24873243241",
    "metadata": {
      "eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
      "occurred_at": "2016-03-15T23:47:15+01:00"
  }, {
    "order_number": "24873243242",
    "metadata": {
      "eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
      "occurred_at": "2016-03-15T23:47:16+01:00"

First invocation with EventTypeCursor{name=order.ORDER_RECEIVED, partition=0, offset=5} and content


Second invocation with EventTypeCursor{name=order.ORDER_RECEIVED, partition=0, offset=6} and content



Analog to RawEventHandler but provides JsonNode object

@NakadiHandler(eventName = EVENT_NAME)
public static class MyJsonEventHandler implements JsonEventHandler {

    public void onEvent(final EventTypeCursor cursor, final JsonNode jsonNode) {
      // your code to handle the jsonNode event

Bulk handlers BatchEventsBulkHandler, RawEventResponseBulkHandler and JsonEventResponseBulkHandler

JSON payload with cursor and list of events. The cursor represents the last element and it is passed as commit object to the offset manager (only one commit is performed). List length is determined by eventsBatchLimit.

public class Handlers {
    public BatchEventsBulkHandler<OrderReceived> batchEventsBulkHandler() {
        return new BatchEventsBulkHandler<OrderReceived>() {

            @NakadiHandler(eventName = EVENT_NAME)
            public void onEvent(EventTypeCursor cursor, List<OrderReceived> list) {
                // your code to handle the event list

            public Class<OrderReceived> getEventClass() {
                return OrderReceived.class;

NakadiBatchEventsHandler, NakadiRawContentHandler, NakadiRawEventHandler, NakadiJsonEventHandler (bulk NakadiBatchEventsBulkHandler, NakadiRawEventBulkHandler, NakadiJsonEventBulkHandler)

One handler class for different events and consumer groups

public NakadiEventConsumers testNakadiEventConsumers() {
    return new NakadiEventConsumers(ImmutableSet.of(NakadiEventConsumer.of(EVENT_NAME_1, "test-multi1"),
                NakadiEventConsumer.of(EVENT_NAME_2, "test-multi2")));

public RawContentHandler createHandlers(final NakadiEventConsumers testNakadiEventConsumers) {
    return new NakadiRawContentHandler() {

        public NakadiEventConsumers getNakadiEventConsumers() {
            return testNakadiEventConsumers;

        public void onEvent(final EventTypeCursor cursor, final String content) {
            // your code

Exception handling in handler

As single exception should not stop the whole stream processing, java.lang.Exception thrown in onEvent handler is suppressed and logged only . If required, a method implementor should take care of replaying separate failed events from the stream. If Zookeeper Partition Coordinator is used, java.lang.Error and any subclass of de.zalando.paradox.nakadi.consumer.core.exceptions.UnrecoverableException will disconnect the the topic-partition processor from the Nakadi server and resume from the last committed offset.

There is also EventErrorHandler interface. Initially there is no implementation for this interface.

public interface EventErrorHandler {

     * This callback method will be called when an exception occurred. The Exception can be many things such as a broken
     * event, unparsable event body, database connection timeout etc.
     * @param  consumerName        Event consumer name
     * @param  t                   Thrown exception itself
     * @param  eventTypePartition  EventTypePartition contains eventType and partition information
     * @param  offset              Current offset
     * @param  rawEvent            Raw event body itself
    void onError(String consumerName, Throwable t, EventTypePartition eventTypePartition, @Nullable String offset, String rawEvent);

Each project should specify what to do with the failed events. They can be logged to the database, log files, elastic search etc.

The example usage can be found below.

public class LoggingEventErrorHandler implements EventErrorHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggingEventErrorHandler.class);

    public void onError(final String consumerName, final Throwable t, final EventTypePartition eventTypePartition, @Nullable final String offset,
            final String rawEvent) {
            "Failed Event // Consumer Name = [{}] , Event Partition = [{}] , Event Type = [{}] , Event Offset = [{}] , Raw Event = [{}] //",
            consumerName, eventTypePartition.getPartition(), eventTypePartition.getEventType(), offset, rawEvent, t);

If the SQS error handler is enabled, it will store the failed events to the SQS queue automatically.

Here are the configuration explanations

paradox.nakadi.errorhandler.sqs.enabled: If it is true, the handler is bound to context automatically.

paradox.nakadi.errorhandler.sqs.queueUrl: Amazon SQS URL of the queue to store failed events.

paradox.nakadi.errorhandler.sqs.region: See available AWS regions in AWS Documentation

Spring boot support endpoints

Stop and restart event receivers

curl -XPOST http://host:port/nakadi/event-receivers/stop   
curl -XPOST http://host:port/nakadi/event-receivers/restart

Replay and restore events

  1. Get current event consumer handlers

        curl -X GET 'http://host:port/nakadi/event-handlers'
  2. Replay event from Nakadi

    • all consumer handlers receiving order.ORDER_RECEIVED
        curl -X POST 'http://host:port/nakadi/event-handlers/event_types/order.ORDER_RECEIVED/partitions/0/offsets/0/replays'
    • consumer test-raw-event-consumer receiving order.ORDER_RECEIVED
        curl -X POST 'http://host:port/nakadi/event-handlers/event_type/order.ORDER_RECEIVED/partitions/0/offsets/0/replays?consumer_name=test-raw-event-consumer&verbose=true'
  3. Restore events from file

       curl --data-binary @0-16516140.json -H "Content-Type: application/json" -X POST 'http://localhost:8082/nakadi/event-handlers/event_types/order.ORDER_RECEIVED/partitions/0/restores'

Replay failed events

  1. Get list of failed event sources

        curl -X GET 'http://host:port/nakadi/failed-event-sources'
  2. Replay failed events from sources

        curl -X POST 'http://host:port/nakadi/failed-event-sources/<FAILED_EVENT_SOURCE_NAME>/?number_of_failed_events=<NUMBER_OF_FAILED_EVENTS_WILL_BE_REPLAYED>&break_processing_on_exception=true'

break_processing_on_exception is used to break the flow when there is exception. The default value is false.

  1. (Optional) Get number of failed events

        curl -X GET 'http://host:port/nakadi/failed-event-sources/<FAILED_EVENT_SOURCE_NAME>'