-
Notifications
You must be signed in to change notification settings - Fork 292
Nakadi Avro Envelope
Nakadi Avro Envelope enables to send binary events through Nakadi in a form of Avro schema defined envelopes consisting metadata and events.
Every published events batch has to be wrapped in PublishingBatch. Every consumption batch comes wrapped in ConsumptionBatch.
The schemas can be loaded using https://nakadi.io/manual.html#/avro-schemas/name/versions_get. Name can be either batch.publishing
or batch.consumption
.
For publishing events, the event type with the schema type avro_schema
has to be created first.
The client has to support the following mode of operation:
-
the client instance is built during start up and should make sure that the event type has a schema which user defined. In order to fulfil the requirement, the client has to check the provided shema in Nakadi using the following endpoint by providing the schema. In response it will receive the version of the schema if exists otherwise it should fail to start, because unknown schema is used.
-
the returned version of the schema then has to be used in metadata
version
field.
For Nakadi Avro events consumption only Subscription API can be used.
The consumption happens over the same REST API as for JSON (GET / POST /subscriptions/{id}/events), but in addition client has to provide Accept header: Accept: application/avro-binary
. In response ConsumptionBatch is sent.
The client should to support the following mode of operation:
- ConsumptionBatch version can be upgraded when client is reconnected to Nakadi. Nakadi does not support a mechanism for notifying about the version used yet
- Client payload schema version can be upgraded. In order to deserialize the data, the client should always be aware of the latest schema. Possibly, downloading the latest schema version on deserialization errors (Apache Avro supports reader / writer schema for deserialization)
The approach is that the user manages only Nakadi Event Type CRD, and Apache Avro Gradle/Maven plugin generates the POJOs during the build phase.
- User creates Avro schema using Nakadi Operator
- User adds Avro plugin for POJOs generation
- User uses POJOs or replaces the current usage (could be just a proper setting for Avro plugin to generate in special package)
- Nakadi client verifies the schema by the schema in the POJO
Recommend plugin is https://github.com/davidmc24/gradle-avro-plugin
build.gralde:
...
plugins {
id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}
...
apply plugin: "com.github.davidmc24.gradle.plugin.avro-base"
...
import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask
def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) {
source("src/main/resources/avro-schema")
outputDir = file("build/generated/sources")
}
In order to easy migrate and use binary format, the client can serialise pojo defined classes to Avro binary format using Jackson extensions
Example:
public class Employee
{
public String name;
public int age;
public String[] emails;
public Employee boss;
}
byte[] avroData = mapper.writer(schema)
.writeValueAsBytes(empl);
Employee empl = mapper.reader(Employee.class)
.with(schema)
.readValue(avroData);