Skip to content

Commit

Permalink
Merge pull request #7 from findify/fix/stackoverflow-on-list
Browse files Browse the repository at this point in the history
fix issue with :: stackoverflow
  • Loading branch information
shuttie authored Jun 16, 2021
2 parents 7cb1b45 + b4c70a3 commit 1baae7d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 3 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ For the sealed trait membership itself, `flink-adt` used an own serialization fo
* you cannot remove ADT members
* you cannot replace ADT members

## Compatibility

This project uses a separate set of serializers for collections, instead of Flink's own TraversableSerializer. So probably you
may have issues while migrating state snapshots from TraversableSerializer to FlinkADT ones.

## Licence

The MIT License (MIT)
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name := "flink-adt"

version := "0.4.1"
version := "0.4.2"

scalaVersion := "2.12.13"
scalaVersion := "2.12.14"

organization := "io.findify"
licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT"))
Expand All @@ -12,7 +12,7 @@ publishMavenStyle := true

publishTo := sonatypePublishToBundle.value

lazy val flinkVersion = "1.13.0"
lazy val flinkVersion = "1.13.1"

libraryDependencies ++= Seq(
"com.propensive" %% "magnolia" % "0.17.0",
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/io/findify/flinkadt/api/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ package object api extends LowPrioImplicits {
new OptionSerializer[T](vs)
implicit def listSerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[List[T]] =
new ListSerializer[T](vs, classTag[T].runtimeClass.asInstanceOf[Class[T]])
implicit def listCCSerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[::[T]] =
new ListCCSerializer[T](vs, classTag[T].runtimeClass.asInstanceOf[Class[T]])

implicit def vectorSerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[Vector[T]] =
new VectorSerializer[T](vs, classTag[T].runtimeClass.asInstanceOf[Class[T]])
implicit def arraySerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[Array[T]] =
Expand Down Expand Up @@ -198,6 +201,9 @@ package object api extends LowPrioImplicits {
implicit lazy val jCharInfo: TypeInformation[java.lang.Character] = BasicTypeInfo.CHAR_TYPE_INFO
implicit lazy val jShortInfo: TypeInformation[java.lang.Short] = BasicTypeInfo.SHORT_TYPE_INFO

implicit def listCCInfo[T: ClassTag](implicit ls: TypeSerializer[::[T]]): TypeInformation[::[T]] =
new CollectionTypeInformation[::[T]](ls)

implicit def listInfo[T: ClassTag](implicit ls: TypeSerializer[List[T]]): TypeInformation[List[T]] =
new CollectionTypeInformation[List[T]](ls)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.findify.flinkadt.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}

class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends SimpleSerializer[::[T]] {
override def createInstance(): ::[T] = throw new IllegalArgumentException("cannot create instance of non-empty list")
override def getLength: Int = -1
override def deserialize(source: DataInputView): ::[T] = {
val count = source.readInt()
val result = for {
_ <- 0 until count
} yield {
child.deserialize(source)
}
::(result.head, result.tail.toList)
}
override def serialize(record: ::[T], target: DataOutputView): Unit = {
target.writeInt(record.size)
record.foreach(element => child.serialize(element, target))
}
override def snapshotConfiguration(): TypeSerializerSnapshot[::[T]] =
new CollectionSerializerSnapshot(child, classOf[ListCCSerializer[T]], clazz)

}
16 changes: 16 additions & 0 deletions src/test/scala/io/findify/flinkadt/SerializerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.findify.flinkadt.SerializerTest.{
Bar2,
Foo,
Foo2,
ListADT,
Nested,
Node,
P2,
Expand Down Expand Up @@ -91,6 +92,13 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
serializable(ser)
}

it should "derive list of ADT" in {
val ser = implicitly[TypeInformation[List[ADT]]].createSerializer(null)
all(ser, List(Foo("a")))
roundtrip(ser, ::(Foo("a"), Nil))
roundtrip(ser, Nil)
}

it should "derive recursively" in {
// recursive is broken
//val ti = implicitly[TypeInformation[Node]]
Expand Down Expand Up @@ -128,6 +136,12 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
roundtrip(ser, SimpleOption(Some("foo")))
}

it should "serialize nested list of ADT" in {
val ser = implicitly[TypeInformation[ListADT]].createSerializer(null)
all(ser, ListADT(Nil))
roundtrip(ser, ListADT(List(Foo("a"))))
}

def roundtrip[T](ser: TypeSerializer[T], in: T) = {
val out = new ByteArrayOutputStream()
ser.serialize(in, new DataOutputViewStreamWrapper(out))
Expand Down Expand Up @@ -204,4 +218,6 @@ object SerializerTest {
case class Node(left: Option[Node], right: Option[Node])

case class SimpleOption(a: Option[String])

case class ListADT(a: List[ADT])
}

0 comments on commit 1baae7d

Please sign in to comment.