Merge pull request #14 from NickBurkard/master
Support Scala 2.13 & Flink 1.15
shuttie authored May 2, 2022
2 parents 85920b0 + d6725ee commit 5ff84ab
Showing 38 changed files with 1,767 additions and 175 deletions.
17 changes: 9 additions & 8 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ more Scala-specific TypeSerializer & TypeInformation derivation support.
in the current scope
* has no silent fallback to Kryo: it will just fail the compilation in a case when serializer cannot be made
* reuses all the low-level serialization code from Flink for basic Java and Scala types
* supports Scala 2.12 & 2.13.
* built for Flink 1.15, which supports arbitrary Scala versions.

* as this project relies on macro to derive TypeSerializer instances, if you're using IntelliJ 2020.*, it may
highlight your code with red, hinting that it cannot find corresponding implicits. And this is fine, the code
compiles OK. 2021 is fine with serializers derived with this library.
* this library is built for Flink 1.14, but marks the `flink-*` dependencies as `provided`, so it should also work with earlier
* Supports only Scala 2.12: underlying Magnolia library has no support for 2.11

## Usage

Expand All @@ -34,14 +33,16 @@ To use this library, swap `import org.apache.flink.api.scala._` with `import io.
So to derive a TypeInformation for a sealed trait, you can do:
import io.findify.flinkadt.api._
import org.apache.flink.api.common.typeinfo.TypeInformation

sealed trait Event
case class Click(id: String) extends Event
case class Purchase(price: Double) extends Event
sealed trait Event extends Product with Serializable

// env is a StreamingExecutionEnvironment
val result = env.fromCollection(List[Event](Click("1"), Purchase(1.0))).executeAndCollect(10)
object Event {
final case class Click(id: String) extends Event
final case class Purchase(price: Double) extends Event

implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation

Be careful with a wildcard import of `import org.apache.flink.api.scala._`: it has a `createTypeInformation` implicit
20 changes: 12 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ name := "flink-adt"

version := "0.4.5"

scalaVersion := "2.12.15"
lazy val `scala 2.12` = "2.12.15"
lazy val `scala 2.13` = "2.13.8"

scalaVersion := `scala 2.13`
crossScalaVersions := Seq(`scala 2.12`, `scala 2.13`)

organization := "io.findify"
licenses := Seq("MIT" -> url(""))
Expand All @@ -12,15 +16,15 @@ publishMavenStyle := true

publishTo := sonatypePublishToBundle.value

lazy val flinkVersion = "1.14.0"
lazy val flinkVersion = "1.15.0"

libraryDependencies ++= Seq(
"com.softwaremill.magnolia1_2" % "magnolia_2.12" % "1.0.0-M7",
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
"org.scalatest" %% "scalatest" % "3.2.10" % "test",
"org.typelevel" %% "cats-core" % "2.7.0" % "test"
"com.softwaremill.magnolia1_2" %% "magnolia" % "1.1.2",
"org.apache.flink" % "flink-java" % flinkVersion % "provided",
"org.apache.flink" % "flink-test-utils" % flinkVersion % "test",
"org.scalatest" %% "scalatest" % "3.2.11" % "test",
"org.typelevel" %% "cats-core" % "2.7.0" % "test",
"org.scala-lang" % "scala-reflect" % scalaVersion.value

scmInfo := Some(
2 changes: 1 addition & 1 deletion project/
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.5.5
sbt.version = 1.6.1
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.2")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.7")
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.2")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.7")
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.20")
@@ -0,0 +1,110 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package io.findify.flinkadt.api.serializer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;

import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/** {@link TypeSerializerSnapshot} for {@link ScalaCaseClassSerializer}. */
public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product>
extends CompositeTypeSerializerSnapshot<T, ScalaCaseClassSerializer<T>> {

private static final int VERSION = 2;

private Class<T> type;

/** Used via reflection. */
public ScalaCaseClassSerializerSnapshot() {

* Used for delegating schema compatibility checks from serializers that were previously using
* {@code TupleSerializerConfigSnapshot}. Type is the {@code outerSnapshot} information, that is
* required to perform {@link #internalResolveSchemaCompatibility(TypeSerializer,
* TypeSerializerSnapshot[])}.
* <p>This is used in {@link
* ScalaCaseClassSerializer#resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(TypeSerializerConfigSnapshot)}.
ScalaCaseClassSerializerSnapshot(Class<T> type) {
this.type = checkNotNull(type, "type can not be NULL");

/** Used for the snapshot path. */
public ScalaCaseClassSerializerSnapshot(ScalaCaseClassSerializer<T> serializerInstance) {
this.type = checkNotNull(serializerInstance.getTupleClass(), "tuple class can not be NULL");

protected int getCurrentOuterSnapshotVersion() {
return VERSION;

protected TypeSerializer<?>[] getNestedSerializers(
ScalaCaseClassSerializer<T> outerSerializer) {
return outerSerializer.getFieldSerializers();

protected ScalaCaseClassSerializer<T> createOuterSerializerWithNestedSerializers(
TypeSerializer<?>[] nestedSerializers) {
checkState(type != null, "type can not be NULL");
return new ScalaCaseClassSerializer<>(type, nestedSerializers);

protected void writeOuterSnapshot(DataOutputView out) throws IOException {
checkState(type != null, "type can not be NULL");

protected void readOuterSnapshot(
int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {
this.type = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);

protected CompositeTypeSerializerSnapshot.OuterSchemaCompatibility
resolveOuterSchemaCompatibility(ScalaCaseClassSerializer<T> newSerializer) {
return (Objects.equals(type, newSerializer.getTupleClass()))
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
@@ -0,0 +1,69 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package io.findify.flinkadt.api.serializer;

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import scala.util.Either;

* Configuration snapshot for serializers of Scala's {@link Either} type, containing configuration
* snapshots of the Left and Right serializers.
public class ScalaEitherSerializerSnapshot<L, R>
extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer<L, R>> {

private static final int CURRENT_VERSION = 1;

/** Constructor for read instantiation. */
public ScalaEitherSerializerSnapshot() {

/** Constructor to create the snapshot for writing. */
public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) {

public int getCurrentOuterSnapshotVersion() {

protected EitherSerializer<L, R> createOuterSerializerWithNestedSerializers(
TypeSerializer<?>[] nestedSerializers) {
TypeSerializer<L> leftSerializer = (TypeSerializer<L>) nestedSerializers[0];

TypeSerializer<R> rightSerializer = (TypeSerializer<R>) nestedSerializers[1];

return new EitherSerializer<>(leftSerializer, rightSerializer);

protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer<L, R> outerSerializer) {
return new TypeSerializer<?>[] {
outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer()

@@ -0,0 +1,60 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package io.findify.flinkadt.api.serializer;

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;

import scala.Option;

/** A {@link TypeSerializerSnapshot} for the Scala {@link OptionSerializer}. */
public final class ScalaOptionSerializerSnapshot<E>
extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {

private static final int VERSION = 2;

public ScalaOptionSerializerSnapshot() {

public ScalaOptionSerializerSnapshot(OptionSerializer<E> serializerInstance) {

protected int getCurrentOuterSnapshotVersion() {
return VERSION;

protected TypeSerializer<?>[] getNestedSerializers(OptionSerializer<E> outerSerializer) {
return new TypeSerializer[] {outerSerializer.elemSerializer()};

protected OptionSerializer<E> createOuterSerializerWithNestedSerializers(
TypeSerializer<?>[] nestedSerializers) {
TypeSerializer<E> nestedSerializer = (TypeSerializer<E>) nestedSerializers[0];
return new OptionSerializer<>(nestedSerializer);

Expand Up @@ -2,7 +2,6 @@ package io.findify.flinkadt.api

import magnolia1.Magnolia
import org.apache.flink.api.common.typeinfo.TypeInformation
import scala.language.experimental.macros

trait LowPrioImplicits {
implicit def deriveTypeInformation[T]: TypeInformation[T] = macro Magnolia.gen[T]
Expand Down

