A small example of a Flink Job to be released as a Kinesis Data Analytics application. In Scala. Using protobuf serdes and Kinesis EFO consumer.
This example is inspired by the AWS ones under:
in particular the "GettingStarted" and "EfoConsumer" ones.
- Java;
- Maven builder (
); - String serdes for Kinesis consumer/producer;
- no explanation on how to properly run a project locally.
- Scala;
- Sbt builder and assembly plugin;
- shows a way to implement custom (protobuf) Kinesis serdes;
- shows how to run the project locally, with Scala-compatible environment loading.
This example is using the sbt-protobuf plugin to generate Java classes from the proto
schema definitions in main/protobuf
These classes provide methods for serializing from/to byte arrays, that are used in the serdes
A protobuf Kryo serializer for Flink it's then registered using Chill with:
env.registerTypeWithKryoSerializer(classOf[Person], classOf[ProtobufSerializer])
Kinesis Data Analytics requires environment properties in the CreateApplication and UpdateApplication actions, under EnvironmentProperties
and EnvironmentPropertiesUpdate
To load this environment while running on the cluster, it's enough to just call:
To load environment properties from a file in the resource
folder should be enough (spoiler, doesn't work) to call:
KinesisAnalyticsRuntime.getApplicationProperties(String filename)
but in Scala this doesn't work, since the Kinesis library is not using getClass.getResourceAsStream(String filename)
=> config/EnvironmentLoader
shows how to achieve environment loading from KDA runtime, with a fallback to an environment file.
The AWS documentation asks to compile the Apache Flink Kinesis connector; this is no more necessary with the new EFO compatible connector.
The flink-scala
and flink-streaming-scala
libraries need to be marked as "provided"; this means that, if we run the project locally, we are not going to find their classes in the classpath.
To have these libraries loaded locally, run:
sbt clean "project flinkEfoConsumerLocal" run
this project extends the base one, removing the provided
configuration from the dependencies.
This example uses the sbt-assembly
plugin; just run:
sbt clean assembly