Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Are Scala Enumeration types supported? #28

Open
jcho7022 opened this issue Jun 17, 2022 · 2 comments
Open

Are Scala Enumeration types supported? #28

jcho7022 opened this issue Jun 17, 2022 · 2 comments

Comments

@jcho7022
Copy link

Hi, I'm testing out this library, and it seems that scala Enumeration types aren't supported in this library.

Test code:

class AdtSerializerTest {
  lazy val cluster = new MiniClusterWithClientResource(
    new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(1).setNumberTaskManagers(1).build()
  )

  lazy val env: StreamExecutionEnvironment = {
    cluster.getTestEnvironment.setAsContext()
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    env.enableCheckpointing(1000)
    env.setRestartStrategy(RestartStrategies.noRestart())
    env.getConfig.disableGenericTypes()
    env
  }

  @BeforeEach
  def beforeEach(): Unit = {
    cluster.before()
  }

  @AfterEach
  def afterEach(): Unit = {
    cluster.after()
  }

  @Test
  def serializerTest(): Unit = {
    val result = env
      .fromScalaCollection(
        List(A("abc", 0.2022f, SomeEnum.BAR), B(-22, SomeCaseClass(-0.2022f, SomeEnum.FOO)))
      ).executeAndCollect(10)

    assertEquals(2, result.size)
  }
}

object AdtSerializerTestAssets {
  object SomeEnum extends Enumeration {
    type SomeEnum = Value

    val FOO: SomeEnum = Value("FOO")
    val BAR: SomeEnum = Value("BAR")
  }

  sealed trait Event extends Product with Serializable
  object Event{
    case class A(someStr: String, someFloat: Float, someEnum: SomeEnum) extends Event
    case class B(someLong: Long, someCaseClass: SomeCaseClass) extends Event

    case class SomeCaseClass(someFloat: Float, someEnum: SomeEnum)

    implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation
  }

  implicit final class FlinkEnvOps(private val env: StreamExecutionEnvironment) extends AnyVal {
    import scala.collection.JavaConverters._

    def fromScalaCollection[A](data: Seq[A])(implicit typeInformation: TypeInformation[A]): DataStreamSource[A] =
      env.fromCollection(data.asJava, typeInformation)
  }
}

Output:

[Error].../AdtSerializerTest.scala:49: could not find implicit value for parameter typeInformation: org.apache.flink.api.common.typeinfo.TypeInformation[...AdtSerializerTestAssets.Event with Serializable]
[Error]...misc/AdtSerializerTest.scala:71: magnolia: could not find TypeInformation.Typeclass for type ....AdtSerializerTestAssets.SomeEnum.SomeEnum
    in parameter 'someEnum' of product type....AdtSerializerTestAssets.Event.A
    in coproduct type ...AdtSerializerTestAssets.Event

Thank you.

@erwan
Copy link

erwan commented Feb 2, 2023

Scala 3 enums can be created as compatibly with Java enums, and those work out of the box in Flink:

https://docs.scala-lang.org/scala3/reference/enums/enums.html#compatibility-with-java-enums

Edit: I tried and Scala 3 enums don't work out of the box in Flink, it seems like the Java enum compat is not enough. However it looks like this lib has support for Scala 3 enums?

@mzuehlke
Copy link

mzuehlke commented Feb 2, 2023

There is a test showing support for Scala enums (even without the Java compatibility):
https://github.com/findify/flink-adt/blob/8926841b826fbdb0512166e31a75ecd1ee60811a/src/test/scala-3/io/findify/flinkadt/Scala3EnumTest.scala

I just guess the "old" Scala 2 Enumeration are not supported ?

Or ist just the TypeInformation for SomeEnum missing ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants