Skip to content

Commit

Permalink
Custom TypeInfo example (#71)
Browse files Browse the repository at this point in the history
* Custom TypeInfo example
* Added more serialization cases, as tests
  • Loading branch information
nicusX authored Oct 30, 2024
1 parent 540208e commit 36473fe
Show file tree
Hide file tree
Showing 16 changed files with 1,203 additions and 10 deletions.
118 changes: 118 additions & 0 deletions java/Serialization/CustomTypeInfo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
## Using custom TypeInformation to avoid serialization falling back to Kryo

This example shows how to define custom TypeInformation for your record objects to prevent Flink from falling back to
Kryo serialization.

* Flink version: 1.20
* Flink API: DataStream API
* Language: Java (11)
* Flink connectors: Kinesis Sink

The application generates random data internally and, after an aggregation, writes the result to Kinesis.

### Flink serialization

This example illustrates how to define custom TypeInfo for the objects used internally in the application or stored in
the application state.

#### Background

Every object handed over between the operators of the application or stored in the application state is serialized using
Flink serialization mechanism.

Flink is able to efficiently serialize most of Java simple types, and POJOs where fields are basic types.

When Flink cannot natively serialize an object, it falls back to using [Kryo](https://github.com/EsotericSoftware/kryo).
Unfortunately, Kryo serialization is less efficient and has a considerable impact on performance. In particular on CPU
utilization.

One important case where Flink cannot natively serialize a field is with Collections. Due to Java type erasure, Flink
cannot discover the type of the elements of the collection, and falls back to Kryo to serialize it.

#### Defining a TypeInfo

To prevent this, you can explicitly define the type of the collection elements using the `@TypeInfo` annotation and
defining a custom `TypeInfoFactory`.
This is demonstrated in these two record classes:
[`VehicleEvent`](src/main/java/com/amazonaws/services/msf/domain/VehicleEvent.java)
and [`AgggregateVehicleEvent`](src/main/java/com/amazonaws/services/msf/domain/AggregateVehicleEvent.java)

```java
public class VehicleEvent {
//...
@TypeInfo(SensorDataTypeInfoFactory.class)
private Map<String, Long> sensorData = new HashMap<>();
//...
public static class SensorDataTypeInfoFactory extends TypeInfoFactory<Map<String, Long>> {
@Override
public TypeInformation<Map<String, Long>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new MapTypeInfo<>(Types.STRING, Types.LONG);
}
}
}
```

For more details about Flink serialization, see [Data Types & Serialization](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#data-types--serialization)
in Flink documentation.

#### Testing serialization

You can test that the serialization of your objects does not fall back to Kryo.

You can use the `PojoTestUtils.assertSerializedAsPojoWithoutKryo(...)` method that is part of Flink test-utils
(`org.apache.flink:flink-test-utils` dependency).

This assertion succeeds only if the serialization does NOT fall back to Kryo.

However, if you make a mistake in the definition of `TypeInfoFactory`, the assertion above may succeed, but the actual
serialization may fail or, even worse, have unpredictable results.

To prevent this type of bugs, you can test that the serialization actually works.

These tests are demonstrated in two unit tests:
[`VehicleEventSerializationTest`](src/test/java/com/amazonaws/services/msf/domain/VehicleEventSerializationTest.java)
and [`AggregateVehicleEventSerializationTest`](src/test/java/com/amazonaws/services/msf/domain/AggregateVehicleEventSerializationTest.java).

These tests use a test utility class that can be reused to test any record class:
[`FlinkSerializationTestUtils`](src/test/java/com/amazonaws/services/msf/domain/FlinkSerializationTestUtils.java)

#### More serialization cases

The test class [MoreKryoSerializationExamplesTest](src/test/java/com/amazonaws/services/msf/domain/MoreKryoSerializationExamplesTest.java)
illustrates more cases where serialization does or does not fall back to Kryo.

Serialization tests are `@Disabled` for those cases where Kryo is used. You can enable these tests to observe how they
actually catch the Kryo fallback.

In particular:
* Any Collection falls back tpo Kryo. You can create custom `TypeInfoFactory` for `List` and `Map`. There is no SetTypeInfo<T> though.
* Some non-basic types like `java.time.Instant` serialize nicely without Kryo
* Most of `java.time.*` types require Kryo
* Nested POJO serialze without Kryo, as long as each component does not require Kryo


### Runtime configuration

When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.

When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.

Runtime parameters:

| Group ID | Key | Description |
|-----------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
| `DataGen` | `records.per.sec` | (optional) Number of generated records per second. Default = 100.
| `OutputStream0` | `stream.arn` | ARN of the output stream |
| `OutputStream0` | `aws.region` | Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |

All parameters are case-sensitive.

## Running locally in IntelliJ

> Due to MSK VPC networking, to run this example on your machine you need to set up network connectivity to the VPC where MSK is deployed, for example with a VPN.
> Setting this connectivity depends on your set up and is out of scope for this example.
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

See [Running examples locally](../running-examples-locally.md) for details.

179 changes: 179 additions & 0 deletions java/Serialization/CustomTypeInfo/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>custom-typeinfo</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<buildDirectory>${project.basedir}/target</buildDirectory>
<jar.finalName>${project.name}-${project.version}</jar.finalName>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<flink.version>1.20.0</flink.version>
<aws.connector.version>4.3.0-1.19</aws.connector.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
</properties>


<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<!-- Connectors and Formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${aws.connector.version}</version>
</dependency>


<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${flink.version}</finalName>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.CustomTypeInfoJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 36473fe

Please sign in to comment.