Skip to content

Commit

Permalink
fix cache leak
Browse files Browse the repository at this point in the history
  • Loading branch information
shuttie committed Aug 26, 2021
1 parent 4ec44ba commit e663343
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 41 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Scala ADT support for Apache Flink

[![CI Status](https://github.com/findify/flink-adt/workflows/CI/badge.svg)](https://github.com/metarank/cfor/actions)
[![CI Status](https://github.com/findify/flink-adt/workflows/CI/badge.svg)](https://github.com/findify/flink-adt/actions)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.findify/flink-adt_2.12/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/io.github.metarank/cfor_2.13)
[![License: Apache 2](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)

This is a prototype of Magnolia-based serializer framework for Apache Flink, with
more Scala-specific TypeSerializer & TypeInformation derivation support.
Expand All @@ -26,7 +26,7 @@ versions

`flink-adt` is released to Maven-central. For SBT, add this snippet to `build.sbt`:
```scala
libraryDependencies += "io.findify" %% "flink-adt" % "0.4.2"
libraryDependencies += "io.findify" %% "flink-adt" % "0.4.3"
```

To use this library, swap `import org.apache.flink.api.scala._` with `import io.findify.flinkadt.api._` and enjoy.
Expand Down Expand Up @@ -69,7 +69,7 @@ may have issues while migrating state snapshots from TraversableSerializer to Fl

The MIT License (MIT)

Copyright (c) 2019 Findify AB
Copyright (c) 2021 Findify AB

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "flink-adt"

version := "0.4.2"
version := "0.4.3"

scalaVersion := "2.12.14"

Expand All @@ -19,7 +19,8 @@ libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
"org.scalatest" %% "scalatest" % "3.2.9" % "test"
"org.scalatest" %% "scalatest" % "3.2.9" % "test",
"org.typelevel" %% "cats-core" % "2.3.0" % "test"
)

scmInfo := Some(
Expand Down
11 changes: 8 additions & 3 deletions src/main/scala/io/findify/flinkadt/api/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ package object api extends LowPrioImplicits {
def combine[T <: Product: ClassTag: TypeTag](
ctx: CaseClass[TypeInformation, T]
): TypeInformation[T] = {
val cacheKey = s"${ctx.typeName.full}_${ctx.typeName.typeArguments}"
val cacheKey = typeName(ctx.typeName)
cache.get(cacheKey) match {
case Some(cached) => cached.asInstanceOf[TypeInformation[T]]
case None =>
Expand All @@ -69,7 +69,8 @@ package object api extends LowPrioImplicits {
def dispatch[T: ClassTag](
ctx: SealedTrait[TypeInformation, T]
): TypeInformation[T] = {
cache.get(ctx.typeName.full) match {
val cacheKey = typeName(ctx.typeName)
cache.get(cacheKey) match {
case Some(cached) => cached.asInstanceOf[TypeInformation[T]]
case None =>
val serializer = new CoproductSerializer[T](
Expand All @@ -83,11 +84,15 @@ package object api extends LowPrioImplicits {
)
val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]]
val ti = new CoproductTypeInformation[T](clazz, serializer)
cache.put(ctx.typeName.full, ti)
cache.put(cacheKey, ti)
ti
}
}

private def typeName(tn: magnolia.TypeName): String = {
s"${tn.full}[${tn.typeArguments.map(typeName).mkString(",")}]"
}

private def loadClass(name: String): Option[Class[_]] = {
val sanitized = name.replaceAllLiterally("::", "$colon$colon")
Try(Class.forName(sanitized)) match {
Expand Down
59 changes: 59 additions & 0 deletions src/test/scala/io/findify/flinkadt/AnyTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.findify.flinkadt

import cats.data.NonEmptyList
import io.findify.flinkadt.AnyTest.FAny
import io.findify.flinkadt.AnyTest.Filter.{FTerm, StringTerm, TermFilter}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

class AnyTest extends AnyFlatSpec with Matchers with TestUtils {
import io.findify.flinkadt.api._

it should "serialize concrete class" in {
val ser = implicitly[TypeInformation[StringTerm]].createSerializer(null)
roundtrip(ser, StringTerm("fo"))
}

it should "serialize ADT" in {
val ser = implicitly[TypeInformation[FAny]].createSerializer(null)
roundtrip(ser, StringTerm("fo"))
}

it should "serialize NEL" in {
val ser = implicitly[TypeInformation[NonEmptyList[FTerm]]].createSerializer(null)
roundtrip(ser, NonEmptyList.one(StringTerm("fo")))
}

it should "serialize nested nel" in {
val ser = implicitly[TypeInformation[TermFilter]].createSerializer(null)
roundtrip(ser, TermFilter("a", NonEmptyList.one(StringTerm("fo"))))
}

}

object AnyTest {
sealed trait FAny

sealed trait FValueAny extends FAny {
def value: Any
}
object Filter {
sealed trait FTerm extends FValueAny
case class StringTerm(value: String) extends FTerm {
type T = String
}
case class NumericTerm(value: Double) extends FTerm {
type T = Double
}

case class TermFilter(
field: String,
values: NonEmptyList[FTerm]
)
}
}
17 changes: 17 additions & 0 deletions src/test/scala/io/findify/flinkadt/CatsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.findify.flinkadt

import cats.data.NonEmptyList
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class CatsTest extends AnyFlatSpec with Matchers with TestUtils {
import io.findify.flinkadt.api._
it should "derive for NEL[String]" in {
val ser = deriveTypeInformation[NonEmptyList[String]].createSerializer(null)
roundtrip(ser, NonEmptyList.one("doo"))
}
it should "derive for NEL[Int]" in {
val ser = deriveTypeInformation[NonEmptyList[Int]].createSerializer(null)
roundtrip(ser, NonEmptyList.one(1))
}
}
3 changes: 1 addition & 2 deletions src/test/scala/io/findify/flinkadt/ExampleTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class ExampleTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
import io.findify.flinkadt.api._

implicit val eventTypeInfo = deriveTypeInformation[Event]

val result = env.fromCollection(List[Event](Click("1"), Purchase(1.0))).executeAndCollect(10)
val result = env.fromCollection(List[Event](Click("1"), Purchase(1.0))).executeAndCollect(10)
result.size shouldBe 2
}

Expand Down
48 changes: 18 additions & 30 deletions src/test/scala/io/findify/flinkadt/SerializerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import io.findify.flinkadt.SerializerTest.{
Annotated,
Bar,
Bar2,
BoundADT,
Foo,
Foo2,
Generic,
ListADT,
Nested,
NestedParent,
Node,
P2,
Param,
Expand All @@ -36,7 +38,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import io.findify.flinkadt.api._

class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with TestUtils {
import io.findify.flinkadt.api._

it should "derive serializer for simple class" in {
Expand Down Expand Up @@ -121,6 +123,16 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
serializable(ser)
}

it should "derive generic type bounded classes" in {
val ser = implicitly[TypeInformation[BoundADT[Foo]]].createSerializer(null)
noKryo(ser)
}

// it should "derive nested generic type bounded classes" in {
// val ser = deriveTypeInformation[NestedParent].createSerializer(null)
// noKryo(ser)
// }

it should "be serializable in case of annotations on classes" in {
val ser = implicitly[TypeInformation[Annotated]].createSerializer(null)
serializable(ser)
Expand Down Expand Up @@ -150,35 +162,6 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors {
all(ser2, Generic(Simple(0, "asd"), Bar(0)))
}

def roundtrip[T](ser: TypeSerializer[T], in: T) = {
val out = new ByteArrayOutputStream()
ser.serialize(in, new DataOutputViewStreamWrapper(out))
val copy = ser.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
in shouldBe copy
}

def noKryo[T](ser: TypeSerializer[T]): Unit =
ser match {
case p: ScalaCaseClassSerializer[_] =>
forAll(p.getFieldSerializers) { param =>
noKryo(param)
}
case _: KryoSerializer[_] =>
throw new IllegalArgumentException("kryo detected")
case _ => // ok
}

def serializable[T](ser: TypeSerializer[T]) = {
val stream = new ObjectOutputStream(new ByteArrayOutputStream())
stream.writeObject(ser)
}

def all[T](ser: TypeSerializer[T], in: T) = {
roundtrip(ser, in)
noKryo(ser)
serializable(ser)
}

}

object SerializerTest {
Expand All @@ -200,6 +183,11 @@ object SerializerTest {

case class WrappedADT(x: ADT)

case class BoundADT[T <: ADT](x: T)

sealed trait NestedParent
case class NestedBoundADT[T <: ADT](x: T) extends NestedParent

@SerialVersionUID(1L)
case class Annotated(foo: String)

Expand Down
42 changes: 42 additions & 0 deletions src/test/scala/io/findify/flinkadt/TestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.findify.flinkadt

import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.scalatest.{Inspectors, Suite}
import org.scalatest.matchers.should.Matchers

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}

trait TestUtils extends Matchers with Inspectors {
def roundtrip[T](ser: TypeSerializer[T], in: T) = {
val out = new ByteArrayOutputStream()
ser.serialize(in, new DataOutputViewStreamWrapper(out))
val copy = ser.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
in shouldBe copy
}

def noKryo[T](ser: TypeSerializer[T]): Unit =
ser match {
case p: ScalaCaseClassSerializer[_] =>
forAll(p.getFieldSerializers) { param =>
noKryo(param)
}
case _: KryoSerializer[_] =>
throw new IllegalArgumentException("kryo detected")
case _ => // ok
}

def serializable[T](ser: TypeSerializer[T]) = {
val stream = new ObjectOutputStream(new ByteArrayOutputStream())
stream.writeObject(ser)
}

def all[T](ser: TypeSerializer[T], in: T) = {
roundtrip(ser, in)
noKryo(ser)
serializable(ser)
}

}

0 comments on commit e663343

Please sign in to comment.