Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Nakadi Avro Envelope

Andrey Dyachkov edited this page Aug 18, 2022 · 5 revisions

Nakadi Avro Envelope

Nakadi Avro Envelope enables to send binary events through Nakadi in a form of Avro schema defined envelopes consisting metadata and events.

Nakadi Client

Every published events batch has to be wrapped in PublishingBatch. Every consumption batch comes wrapped in ConsumptionBatch.

The schemas can be loaded using https://nakadi.io/manual.html#/avro-schemas/name/versions_get. Name can be either batch.publishing or batch.consumption.

Publishing

For publishing events, the event type with the schema type avro_schema has to be created first.

The client has to support the following mode of operation:

  • the client instance is built during start up and should make sure that the event type has a schema which user defined. In order to fulfil the requirement, the client has to check the provided shema in Nakadi using the following endpoint by providing the schema. In response it will receive the version of the schema if exists otherwise it should fail to start, because unknown schema is used.

  • the returned version of the schema then has to be used in metadata version field.

Consumption

For Nakadi Avro events consumption only Subscription API can be used. The consumption happens over the same REST API as for JSON (GET / POST /subscriptions/{id}/events), but in addition client has to provide Accept header: Accept: application/avro-binary. In response ConsumptionBatch is sent.

The client should to support the following mode of operation:

  • ConsumptionBatch version can be upgraded when client is reconnected to Nakadi. Nakadi does not support a mechanism for notifying about the version used yet
  • Client payload schema version can be upgraded. In order to deserialize the data, the client should always be aware of the latest schema. Possibly, downloading the latest schema version on deserialization errors (Apache Avro supports reader / writer schema for deserialization)

Java

Code generation from schema

The approach is that the user manages only Nakadi Event Type CRD, and Apache Avro Gradle/Maven plugin generates the POJOs during the build phase.

  1. User creates Avro schema using Nakadi Operator
  2. User adds Avro plugin for POJOs generation
  3. User uses POJOs or replaces the current usage (could be just a proper setting for Avro plugin to generate in special package)
  4. Nakadi client verifies the schema by the schema in the POJO

Gradle plugin set up

Recommend plugin is https://github.com/davidmc24/gradle-avro-plugin

build.gralde:

...
plugins {
    id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}
...
apply plugin: "com.github.davidmc24.gradle.plugin.avro-base"
...

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) {
    source("src/main/resources/avro-schema")
    outputDir = file("build/generated/sources")
}

Migration to existing POJO

In order to easy migrate and use binary format, the client can serialise pojo defined classes to Avro binary format using Jackson extensions

Example:

public class Employee
{
    public String name;
    public int age;
    public String[] emails;
    public Employee boss;
}

byte[] avroData = mapper.writer(schema)
   .writeValueAsBytes(empl);

Employee empl = mapper.reader(Employee.class)
   .with(schema)
   .readValue(avroData);
Clone this wiki locally