From e663343599db49640df26022313e2602bbaeff37 Mon Sep 17 00:00:00 2001 From: Grebennikov Roman Date: Thu, 26 Aug 2021 14:01:49 +0200 Subject: [PATCH] fix cache leak --- README.md | 8 +-- build.sbt | 5 +- .../io/findify/flinkadt/api/package.scala | 11 +++- .../scala/io/findify/flinkadt/AnyTest.scala | 59 +++++++++++++++++++ .../scala/io/findify/flinkadt/CatsTest.scala | 17 ++++++ .../io/findify/flinkadt/ExampleTest.scala | 3 +- .../io/findify/flinkadt/SerializerTest.scala | 48 ++++++--------- .../scala/io/findify/flinkadt/TestUtils.scala | 42 +++++++++++++ 8 files changed, 152 insertions(+), 41 deletions(-) create mode 100644 src/test/scala/io/findify/flinkadt/AnyTest.scala create mode 100644 src/test/scala/io/findify/flinkadt/CatsTest.scala create mode 100644 src/test/scala/io/findify/flinkadt/TestUtils.scala diff --git a/README.md b/README.md index 12251bc..4dd3dac 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Scala ADT support for Apache Flink -[![CI Status](https://github.com/findify/flink-adt/workflows/CI/badge.svg)](https://github.com/metarank/cfor/actions) +[![CI Status](https://github.com/findify/flink-adt/workflows/CI/badge.svg)](https://github.com/findify/flink-adt/actions) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.findify/flink-adt_2.12/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/io.github.metarank/cfor_2.13) -[![License: Apache 2](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) +[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) This is a prototype of Magnolia-based serializer framework for Apache Flink, with more Scala-specific TypeSerializer & TypeInformation derivation support. @@ -26,7 +26,7 @@ versions `flink-adt` is released to Maven-central. For SBT, add this snippet to `build.sbt`: ```scala -libraryDependencies += "io.findify" %% "flink-adt" % "0.4.2" +libraryDependencies += "io.findify" %% "flink-adt" % "0.4.3" ``` To use this library, swap `import org.apache.flink.api.scala._` with `import io.findify.flinkadt.api._` and enjoy. @@ -69,7 +69,7 @@ may have issues while migrating state snapshots from TraversableSerializer to Fl The MIT License (MIT) -Copyright (c) 2019 Findify AB +Copyright (c) 2021 Findify AB Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/build.sbt b/build.sbt index 98f7b63..6eab8c2 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "flink-adt" -version := "0.4.2" +version := "0.4.3" scalaVersion := "2.12.14" @@ -19,7 +19,8 @@ libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test", - "org.scalatest" %% "scalatest" % "3.2.9" % "test" + "org.scalatest" %% "scalatest" % "3.2.9" % "test", + "org.typelevel" %% "cats-core" % "2.3.0" % "test" ) scmInfo := Some( diff --git a/src/main/scala/io/findify/flinkadt/api/package.scala b/src/main/scala/io/findify/flinkadt/api/package.scala index 3c39fb7..0c25f4c 100644 --- a/src/main/scala/io/findify/flinkadt/api/package.scala +++ b/src/main/scala/io/findify/flinkadt/api/package.scala @@ -42,7 +42,7 @@ package object api extends LowPrioImplicits { def combine[T <: Product: ClassTag: TypeTag]( ctx: CaseClass[TypeInformation, T] ): TypeInformation[T] = { - val cacheKey = s"${ctx.typeName.full}_${ctx.typeName.typeArguments}" + val cacheKey = typeName(ctx.typeName) cache.get(cacheKey) match { case Some(cached) => cached.asInstanceOf[TypeInformation[T]] case None => @@ -69,7 +69,8 @@ package object api extends LowPrioImplicits { def dispatch[T: ClassTag]( ctx: SealedTrait[TypeInformation, T] ): TypeInformation[T] = { - cache.get(ctx.typeName.full) match { + val cacheKey = typeName(ctx.typeName) + cache.get(cacheKey) match { case Some(cached) => cached.asInstanceOf[TypeInformation[T]] case None => val serializer = new CoproductSerializer[T]( @@ -83,11 +84,15 @@ package object api extends LowPrioImplicits { ) val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]] val ti = new CoproductTypeInformation[T](clazz, serializer) - cache.put(ctx.typeName.full, ti) + cache.put(cacheKey, ti) ti } } + private def typeName(tn: magnolia.TypeName): String = { + s"${tn.full}[${tn.typeArguments.map(typeName).mkString(",")}]" + } + private def loadClass(name: String): Option[Class[_]] = { val sanitized = name.replaceAllLiterally("::", "$colon$colon") Try(Class.forName(sanitized)) match { diff --git a/src/test/scala/io/findify/flinkadt/AnyTest.scala b/src/test/scala/io/findify/flinkadt/AnyTest.scala new file mode 100644 index 0000000..b319f85 --- /dev/null +++ b/src/test/scala/io/findify/flinkadt/AnyTest.scala @@ -0,0 +1,59 @@ +package io.findify.flinkadt + +import cats.data.NonEmptyList +import io.findify.flinkadt.AnyTest.FAny +import io.findify.flinkadt.AnyTest.Filter.{FTerm, StringTerm, TermFilter} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +class AnyTest extends AnyFlatSpec with Matchers with TestUtils { + import io.findify.flinkadt.api._ + + it should "serialize concrete class" in { + val ser = implicitly[TypeInformation[StringTerm]].createSerializer(null) + roundtrip(ser, StringTerm("fo")) + } + + it should "serialize ADT" in { + val ser = implicitly[TypeInformation[FAny]].createSerializer(null) + roundtrip(ser, StringTerm("fo")) + } + + it should "serialize NEL" in { + val ser = implicitly[TypeInformation[NonEmptyList[FTerm]]].createSerializer(null) + roundtrip(ser, NonEmptyList.one(StringTerm("fo"))) + } + + it should "serialize nested nel" in { + val ser = implicitly[TypeInformation[TermFilter]].createSerializer(null) + roundtrip(ser, TermFilter("a", NonEmptyList.one(StringTerm("fo")))) + } + +} + +object AnyTest { + sealed trait FAny + + sealed trait FValueAny extends FAny { + def value: Any + } + object Filter { + sealed trait FTerm extends FValueAny + case class StringTerm(value: String) extends FTerm { + type T = String + } + case class NumericTerm(value: Double) extends FTerm { + type T = Double + } + + case class TermFilter( + field: String, + values: NonEmptyList[FTerm] + ) + } +} diff --git a/src/test/scala/io/findify/flinkadt/CatsTest.scala b/src/test/scala/io/findify/flinkadt/CatsTest.scala new file mode 100644 index 0000000..c6aca95 --- /dev/null +++ b/src/test/scala/io/findify/flinkadt/CatsTest.scala @@ -0,0 +1,17 @@ +package io.findify.flinkadt + +import cats.data.NonEmptyList +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class CatsTest extends AnyFlatSpec with Matchers with TestUtils { + import io.findify.flinkadt.api._ + it should "derive for NEL[String]" in { + val ser = deriveTypeInformation[NonEmptyList[String]].createSerializer(null) + roundtrip(ser, NonEmptyList.one("doo")) + } + it should "derive for NEL[Int]" in { + val ser = deriveTypeInformation[NonEmptyList[Int]].createSerializer(null) + roundtrip(ser, NonEmptyList.one(1)) + } +} diff --git a/src/test/scala/io/findify/flinkadt/ExampleTest.scala b/src/test/scala/io/findify/flinkadt/ExampleTest.scala index 74524b3..51861f3 100644 --- a/src/test/scala/io/findify/flinkadt/ExampleTest.scala +++ b/src/test/scala/io/findify/flinkadt/ExampleTest.scala @@ -41,8 +41,7 @@ class ExampleTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll { import io.findify.flinkadt.api._ implicit val eventTypeInfo = deriveTypeInformation[Event] - - val result = env.fromCollection(List[Event](Click("1"), Purchase(1.0))).executeAndCollect(10) + val result = env.fromCollection(List[Event](Click("1"), Purchase(1.0))).executeAndCollect(10) result.size shouldBe 2 } diff --git a/src/test/scala/io/findify/flinkadt/SerializerTest.scala b/src/test/scala/io/findify/flinkadt/SerializerTest.scala index e5e0f8b..d3d4255 100644 --- a/src/test/scala/io/findify/flinkadt/SerializerTest.scala +++ b/src/test/scala/io/findify/flinkadt/SerializerTest.scala @@ -10,11 +10,13 @@ import io.findify.flinkadt.SerializerTest.{ Annotated, Bar, Bar2, + BoundADT, Foo, Foo2, Generic, ListADT, Nested, + NestedParent, Node, P2, Param, @@ -36,7 +38,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import io.findify.flinkadt.api._ -class SerializerTest extends AnyFlatSpec with Matchers with Inspectors { +class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with TestUtils { import io.findify.flinkadt.api._ it should "derive serializer for simple class" in { @@ -121,6 +123,16 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors { serializable(ser) } + it should "derive generic type bounded classes" in { + val ser = implicitly[TypeInformation[BoundADT[Foo]]].createSerializer(null) + noKryo(ser) + } + +// it should "derive nested generic type bounded classes" in { +// val ser = deriveTypeInformation[NestedParent].createSerializer(null) +// noKryo(ser) +// } + it should "be serializable in case of annotations on classes" in { val ser = implicitly[TypeInformation[Annotated]].createSerializer(null) serializable(ser) @@ -150,35 +162,6 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors { all(ser2, Generic(Simple(0, "asd"), Bar(0))) } - def roundtrip[T](ser: TypeSerializer[T], in: T) = { - val out = new ByteArrayOutputStream() - ser.serialize(in, new DataOutputViewStreamWrapper(out)) - val copy = ser.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))) - in shouldBe copy - } - - def noKryo[T](ser: TypeSerializer[T]): Unit = - ser match { - case p: ScalaCaseClassSerializer[_] => - forAll(p.getFieldSerializers) { param => - noKryo(param) - } - case _: KryoSerializer[_] => - throw new IllegalArgumentException("kryo detected") - case _ => // ok - } - - def serializable[T](ser: TypeSerializer[T]) = { - val stream = new ObjectOutputStream(new ByteArrayOutputStream()) - stream.writeObject(ser) - } - - def all[T](ser: TypeSerializer[T], in: T) = { - roundtrip(ser, in) - noKryo(ser) - serializable(ser) - } - } object SerializerTest { @@ -200,6 +183,11 @@ object SerializerTest { case class WrappedADT(x: ADT) + case class BoundADT[T <: ADT](x: T) + + sealed trait NestedParent + case class NestedBoundADT[T <: ADT](x: T) extends NestedParent + @SerialVersionUID(1L) case class Annotated(foo: String) diff --git a/src/test/scala/io/findify/flinkadt/TestUtils.scala b/src/test/scala/io/findify/flinkadt/TestUtils.scala new file mode 100644 index 0000000..fb0139f --- /dev/null +++ b/src/test/scala/io/findify/flinkadt/TestUtils.scala @@ -0,0 +1,42 @@ +package io.findify.flinkadt + +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer +import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.scalatest.{Inspectors, Suite} +import org.scalatest.matchers.should.Matchers + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream} + +trait TestUtils extends Matchers with Inspectors { + def roundtrip[T](ser: TypeSerializer[T], in: T) = { + val out = new ByteArrayOutputStream() + ser.serialize(in, new DataOutputViewStreamWrapper(out)) + val copy = ser.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))) + in shouldBe copy + } + + def noKryo[T](ser: TypeSerializer[T]): Unit = + ser match { + case p: ScalaCaseClassSerializer[_] => + forAll(p.getFieldSerializers) { param => + noKryo(param) + } + case _: KryoSerializer[_] => + throw new IllegalArgumentException("kryo detected") + case _ => // ok + } + + def serializable[T](ser: TypeSerializer[T]) = { + val stream = new ObjectOutputStream(new ByteArrayOutputStream()) + stream.writeObject(ser) + } + + def all[T](ser: TypeSerializer[T], in: T) = { + roundtrip(ser, in) + noKryo(ser) + serializable(ser) + } + +}