Skip to content

Commit d2a11cc

Browse files
authored
Merge pull request #11 from findify/fix/cache-leak
More tests
2 parents 4ec44ba + e663343 commit d2a11cc

File tree

8 files changed

+152
-41
lines changed

8 files changed

+152
-41
lines changed

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Scala ADT support for Apache Flink
22

3-
[![CI Status](https://github.com/findify/flink-adt/workflows/CI/badge.svg)](https://github.com/metarank/cfor/actions)
3+
[![CI Status](https://github.com/findify/flink-adt/workflows/CI/badge.svg)](https://github.com/findify/flink-adt/actions)
44
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.findify/flink-adt_2.12/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/io.github.metarank/cfor_2.13)
5-
[![License: Apache 2](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)
5+
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)
66

77
This is a prototype of Magnolia-based serializer framework for Apache Flink, with
88
more Scala-specific TypeSerializer & TypeInformation derivation support.
@@ -26,7 +26,7 @@ versions
2626

2727
`flink-adt` is released to Maven-central. For SBT, add this snippet to `build.sbt`:
2828
```scala
29-
libraryDependencies += "io.findify" %% "flink-adt" % "0.4.2"
29+
libraryDependencies += "io.findify" %% "flink-adt" % "0.4.3"
3030
```
3131

3232
To use this library, swap `import org.apache.flink.api.scala._` with `import io.findify.flinkadt.api._` and enjoy.
@@ -69,7 +69,7 @@ may have issues while migrating state snapshots from TraversableSerializer to Fl
6969

7070
The MIT License (MIT)
7171

72-
Copyright (c) 2019 Findify AB
72+
Copyright (c) 2021 Findify AB
7373

7474
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
7575

build.sbt

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name := "flink-adt"
22

3-
version := "0.4.2"
3+
version := "0.4.3"
44

55
scalaVersion := "2.12.14"
66

@@ -19,7 +19,8 @@ libraryDependencies ++= Seq(
1919
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
2020
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
2121
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
22-
"org.scalatest" %% "scalatest" % "3.2.9" % "test"
22+
"org.scalatest" %% "scalatest" % "3.2.9" % "test",
23+
"org.typelevel" %% "cats-core" % "2.3.0" % "test"
2324
)
2425

2526
scmInfo := Some(

src/main/scala/io/findify/flinkadt/api/package.scala

+8-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ package object api extends LowPrioImplicits {
4242
def combine[T <: Product: ClassTag: TypeTag](
4343
ctx: CaseClass[TypeInformation, T]
4444
): TypeInformation[T] = {
45-
val cacheKey = s"${ctx.typeName.full}_${ctx.typeName.typeArguments}"
45+
val cacheKey = typeName(ctx.typeName)
4646
cache.get(cacheKey) match {
4747
case Some(cached) => cached.asInstanceOf[TypeInformation[T]]
4848
case None =>
@@ -69,7 +69,8 @@ package object api extends LowPrioImplicits {
6969
def dispatch[T: ClassTag](
7070
ctx: SealedTrait[TypeInformation, T]
7171
): TypeInformation[T] = {
72-
cache.get(ctx.typeName.full) match {
72+
val cacheKey = typeName(ctx.typeName)
73+
cache.get(cacheKey) match {
7374
case Some(cached) => cached.asInstanceOf[TypeInformation[T]]
7475
case None =>
7576
val serializer = new CoproductSerializer[T](
@@ -83,11 +84,15 @@ package object api extends LowPrioImplicits {
8384
)
8485
val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]]
8586
val ti = new CoproductTypeInformation[T](clazz, serializer)
86-
cache.put(ctx.typeName.full, ti)
87+
cache.put(cacheKey, ti)
8788
ti
8889
}
8990
}
9091

92+
private def typeName(tn: magnolia.TypeName): String = {
93+
s"${tn.full}[${tn.typeArguments.map(typeName).mkString(",")}]"
94+
}
95+
9196
private def loadClass(name: String): Option[Class[_]] = {
9297
val sanitized = name.replaceAllLiterally("::", "$colon$colon")
9398
Try(Class.forName(sanitized)) match {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.findify.flinkadt
2+
3+
import cats.data.NonEmptyList
4+
import io.findify.flinkadt.AnyTest.FAny
5+
import io.findify.flinkadt.AnyTest.Filter.{FTerm, StringTerm, TermFilter}
6+
import org.apache.flink.api.common.typeinfo.TypeInformation
7+
import org.apache.flink.api.common.typeutils.TypeSerializer
8+
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
9+
import org.scalatest.flatspec.AnyFlatSpec
10+
import org.scalatest.matchers.should.Matchers
11+
12+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
13+
14+
class AnyTest extends AnyFlatSpec with Matchers with TestUtils {
15+
import io.findify.flinkadt.api._
16+
17+
it should "serialize concrete class" in {
18+
val ser = implicitly[TypeInformation[StringTerm]].createSerializer(null)
19+
roundtrip(ser, StringTerm("fo"))
20+
}
21+
22+
it should "serialize ADT" in {
23+
val ser = implicitly[TypeInformation[FAny]].createSerializer(null)
24+
roundtrip(ser, StringTerm("fo"))
25+
}
26+
27+
it should "serialize NEL" in {
28+
val ser = implicitly[TypeInformation[NonEmptyList[FTerm]]].createSerializer(null)
29+
roundtrip(ser, NonEmptyList.one(StringTerm("fo")))
30+
}
31+
32+
it should "serialize nested nel" in {
33+
val ser = implicitly[TypeInformation[TermFilter]].createSerializer(null)
34+
roundtrip(ser, TermFilter("a", NonEmptyList.one(StringTerm("fo"))))
35+
}
36+
37+
}
38+
39+
object AnyTest {
40+
sealed trait FAny
41+
42+
sealed trait FValueAny extends FAny {
43+
def value: Any
44+
}
45+
object Filter {
46+
sealed trait FTerm extends FValueAny
47+
case class StringTerm(value: String) extends FTerm {
48+
type T = String
49+
}
50+
case class NumericTerm(value: Double) extends FTerm {
51+
type T = Double
52+
}
53+
54+
case class TermFilter(
55+
field: String,
56+
values: NonEmptyList[FTerm]
57+
)
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.findify.flinkadt
2+
3+
import cats.data.NonEmptyList
4+
import org.scalatest.flatspec.AnyFlatSpec
5+
import org.scalatest.matchers.should.Matchers
6+
7+
class CatsTest extends AnyFlatSpec with Matchers with TestUtils {
8+
import io.findify.flinkadt.api._
9+
it should "derive for NEL[String]" in {
10+
val ser = deriveTypeInformation[NonEmptyList[String]].createSerializer(null)
11+
roundtrip(ser, NonEmptyList.one("doo"))
12+
}
13+
it should "derive for NEL[Int]" in {
14+
val ser = deriveTypeInformation[NonEmptyList[Int]].createSerializer(null)
15+
roundtrip(ser, NonEmptyList.one(1))
16+
}
17+
}

src/test/scala/io/findify/flinkadt/ExampleTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ class ExampleTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
4141
import io.findify.flinkadt.api._
4242

4343
implicit val eventTypeInfo = deriveTypeInformation[Event]
44-
45-
val result = env.fromCollection(List[Event](Click("1"), Purchase(1.0))).executeAndCollect(10)
44+
val result = env.fromCollection(List[Event](Click("1"), Purchase(1.0))).executeAndCollect(10)
4645
result.size shouldBe 2
4746
}
4847

src/test/scala/io/findify/flinkadt/SerializerTest.scala

+18-30
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import io.findify.flinkadt.SerializerTest.{
1010
Annotated,
1111
Bar,
1212
Bar2,
13+
BoundADT,
1314
Foo,
1415
Foo2,
1516
Generic,
1617
ListADT,
1718
Nested,
19+
NestedParent,
1820
Node,
1921
P2,
2022
Param,
@@ -36,7 +38,7 @@ import org.scalatest.flatspec.AnyFlatSpec
3638
import org.scalatest.matchers.should.Matchers
3739
import io.findify.flinkadt.api._
3840

39-
class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
41+
class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with TestUtils {
4042
import io.findify.flinkadt.api._
4143

4244
it should "derive serializer for simple class" in {
@@ -121,6 +123,16 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
121123
serializable(ser)
122124
}
123125

126+
it should "derive generic type bounded classes" in {
127+
val ser = implicitly[TypeInformation[BoundADT[Foo]]].createSerializer(null)
128+
noKryo(ser)
129+
}
130+
131+
// it should "derive nested generic type bounded classes" in {
132+
// val ser = deriveTypeInformation[NestedParent].createSerializer(null)
133+
// noKryo(ser)
134+
// }
135+
124136
it should "be serializable in case of annotations on classes" in {
125137
val ser = implicitly[TypeInformation[Annotated]].createSerializer(null)
126138
serializable(ser)
@@ -150,35 +162,6 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
150162
all(ser2, Generic(Simple(0, "asd"), Bar(0)))
151163
}
152164

153-
def roundtrip[T](ser: TypeSerializer[T], in: T) = {
154-
val out = new ByteArrayOutputStream()
155-
ser.serialize(in, new DataOutputViewStreamWrapper(out))
156-
val copy = ser.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
157-
in shouldBe copy
158-
}
159-
160-
def noKryo[T](ser: TypeSerializer[T]): Unit =
161-
ser match {
162-
case p: ScalaCaseClassSerializer[_] =>
163-
forAll(p.getFieldSerializers) { param =>
164-
noKryo(param)
165-
}
166-
case _: KryoSerializer[_] =>
167-
throw new IllegalArgumentException("kryo detected")
168-
case _ => // ok
169-
}
170-
171-
def serializable[T](ser: TypeSerializer[T]) = {
172-
val stream = new ObjectOutputStream(new ByteArrayOutputStream())
173-
stream.writeObject(ser)
174-
}
175-
176-
def all[T](ser: TypeSerializer[T], in: T) = {
177-
roundtrip(ser, in)
178-
noKryo(ser)
179-
serializable(ser)
180-
}
181-
182165
}
183166

184167
object SerializerTest {
@@ -200,6 +183,11 @@ object SerializerTest {
200183

201184
case class WrappedADT(x: ADT)
202185

186+
case class BoundADT[T <: ADT](x: T)
187+
188+
sealed trait NestedParent
189+
case class NestedBoundADT[T <: ADT](x: T) extends NestedParent
190+
203191
@SerialVersionUID(1L)
204192
case class Annotated(foo: String)
205193

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.findify.flinkadt
2+
3+
import org.apache.flink.api.common.typeutils.TypeSerializer
4+
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
5+
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer
6+
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
7+
import org.scalatest.{Inspectors, Suite}
8+
import org.scalatest.matchers.should.Matchers
9+
10+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}
11+
12+
trait TestUtils extends Matchers with Inspectors {
13+
def roundtrip[T](ser: TypeSerializer[T], in: T) = {
14+
val out = new ByteArrayOutputStream()
15+
ser.serialize(in, new DataOutputViewStreamWrapper(out))
16+
val copy = ser.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
17+
in shouldBe copy
18+
}
19+
20+
def noKryo[T](ser: TypeSerializer[T]): Unit =
21+
ser match {
22+
case p: ScalaCaseClassSerializer[_] =>
23+
forAll(p.getFieldSerializers) { param =>
24+
noKryo(param)
25+
}
26+
case _: KryoSerializer[_] =>
27+
throw new IllegalArgumentException("kryo detected")
28+
case _ => // ok
29+
}
30+
31+
def serializable[T](ser: TypeSerializer[T]) = {
32+
val stream = new ObjectOutputStream(new ByteArrayOutputStream())
33+
stream.writeObject(ser)
34+
}
35+
36+
def all[T](ser: TypeSerializer[T], in: T) = {
37+
roundtrip(ser, in)
38+
noKryo(ser)
39+
serializable(ser)
40+
}
41+
42+
}

0 commit comments

Comments
 (0)