From b4c70a3eeeb449434ba37478476775ff365511c5 Mon Sep 17 00:00:00 2001 From: Roman Grebennikov Date: Wed, 16 Jun 2021 16:47:36 +0300 Subject: [PATCH] fix issue with :: stackoverflow --- README.md | 5 ++++ build.sbt | 6 ++--- .../io/findify/flinkadt/api/package.scala | 6 +++++ .../api/serializer/ListCCSerializer.scala | 25 +++++++++++++++++++ .../io/findify/flinkadt/SerializerTest.scala | 16 ++++++++++++ 5 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/io/findify/flinkadt/api/serializer/ListCCSerializer.scala diff --git a/README.md b/README.md index b8755ee..642f41d 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,11 @@ For the sealed trait membership itself, `flink-adt` used an own serialization fo * you cannot remove ADT members * you cannot replace ADT members +## Compatibility + +This project uses a separate set of serializers for collections, instead of Flink's own TraversableSerializer. So probably you +may have issues while migrating state snapshots from TraversableSerializer to FlinkADT ones. + ## Licence The MIT License (MIT) diff --git a/build.sbt b/build.sbt index ed4c011..98f7b63 100644 --- a/build.sbt +++ b/build.sbt @@ -1,8 +1,8 @@ name := "flink-adt" -version := "0.4.1" +version := "0.4.2" -scalaVersion := "2.12.13" +scalaVersion := "2.12.14" organization := "io.findify" licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT")) @@ -12,7 +12,7 @@ publishMavenStyle := true publishTo := sonatypePublishToBundle.value -lazy val flinkVersion = "1.13.0" +lazy val flinkVersion = "1.13.1" libraryDependencies ++= Seq( "com.propensive" %% "magnolia" % "0.17.0", diff --git a/src/main/scala/io/findify/flinkadt/api/package.scala b/src/main/scala/io/findify/flinkadt/api/package.scala index 4ec7cba..379708c 100644 --- a/src/main/scala/io/findify/flinkadt/api/package.scala +++ b/src/main/scala/io/findify/flinkadt/api/package.scala @@ -124,6 +124,9 @@ package object api extends LowPrioImplicits { new OptionSerializer[T](vs) implicit def listSerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[List[T]] = new ListSerializer[T](vs, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + implicit def listCCSerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[::[T]] = + new ListCCSerializer[T](vs, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + implicit def vectorSerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[Vector[T]] = new VectorSerializer[T](vs, classTag[T].runtimeClass.asInstanceOf[Class[T]]) implicit def arraySerializer[T: ClassTag](implicit vs: TypeSerializer[T]): TypeSerializer[Array[T]] = @@ -198,6 +201,9 @@ package object api extends LowPrioImplicits { implicit lazy val jCharInfo: TypeInformation[java.lang.Character] = BasicTypeInfo.CHAR_TYPE_INFO implicit lazy val jShortInfo: TypeInformation[java.lang.Short] = BasicTypeInfo.SHORT_TYPE_INFO + implicit def listCCInfo[T: ClassTag](implicit ls: TypeSerializer[::[T]]): TypeInformation[::[T]] = + new CollectionTypeInformation[::[T]](ls) + implicit def listInfo[T: ClassTag](implicit ls: TypeSerializer[List[T]]): TypeInformation[List[T]] = new CollectionTypeInformation[List[T]](ls) diff --git a/src/main/scala/io/findify/flinkadt/api/serializer/ListCCSerializer.scala b/src/main/scala/io/findify/flinkadt/api/serializer/ListCCSerializer.scala new file mode 100644 index 0000000..bf30ec0 --- /dev/null +++ b/src/main/scala/io/findify/flinkadt/api/serializer/ListCCSerializer.scala @@ -0,0 +1,25 @@ +package io.findify.flinkadt.api.serializer + +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends SimpleSerializer[::[T]] { + override def createInstance(): ::[T] = throw new IllegalArgumentException("cannot create instance of non-empty list") + override def getLength: Int = -1 + override def deserialize(source: DataInputView): ::[T] = { + val count = source.readInt() + val result = for { + _ <- 0 until count + } yield { + child.deserialize(source) + } + ::(result.head, result.tail.toList) + } + override def serialize(record: ::[T], target: DataOutputView): Unit = { + target.writeInt(record.size) + record.foreach(element => child.serialize(element, target)) + } + override def snapshotConfiguration(): TypeSerializerSnapshot[::[T]] = + new CollectionSerializerSnapshot(child, classOf[ListCCSerializer[T]], clazz) + +} diff --git a/src/test/scala/io/findify/flinkadt/SerializerTest.scala b/src/test/scala/io/findify/flinkadt/SerializerTest.scala index 4533dbd..66b5839 100644 --- a/src/test/scala/io/findify/flinkadt/SerializerTest.scala +++ b/src/test/scala/io/findify/flinkadt/SerializerTest.scala @@ -12,6 +12,7 @@ import io.findify.flinkadt.SerializerTest.{ Bar2, Foo, Foo2, + ListADT, Nested, Node, P2, @@ -91,6 +92,13 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors { serializable(ser) } + it should "derive list of ADT" in { + val ser = implicitly[TypeInformation[List[ADT]]].createSerializer(null) + all(ser, List(Foo("a"))) + roundtrip(ser, ::(Foo("a"), Nil)) + roundtrip(ser, Nil) + } + it should "derive recursively" in { // recursive is broken //val ti = implicitly[TypeInformation[Node]] @@ -128,6 +136,12 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors { roundtrip(ser, SimpleOption(Some("foo"))) } + it should "serialize nested list of ADT" in { + val ser = implicitly[TypeInformation[ListADT]].createSerializer(null) + all(ser, ListADT(Nil)) + roundtrip(ser, ListADT(List(Foo("a")))) + } + def roundtrip[T](ser: TypeSerializer[T], in: T) = { val out = new ByteArrayOutputStream() ser.serialize(in, new DataOutputViewStreamWrapper(out)) @@ -204,4 +218,6 @@ object SerializerTest { case class Node(left: Option[Node], right: Option[Node]) case class SimpleOption(a: Option[String]) + + case class ListADT(a: List[ADT]) }