You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I'm testing out this library, and it seems that scala Enumeration types aren't supported in this library.
Test code:
class AdtSerializerTest {
lazy val cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(1).setNumberTaskManagers(1).build()
)
lazy val env: StreamExecutionEnvironment = {
cluster.getTestEnvironment.setAsContext()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.enableCheckpointing(1000)
env.setRestartStrategy(RestartStrategies.noRestart())
env.getConfig.disableGenericTypes()
env
}
@BeforeEach
def beforeEach(): Unit = {
cluster.before()
}
@AfterEach
def afterEach(): Unit = {
cluster.after()
}
@Test
def serializerTest(): Unit = {
val result = env
.fromScalaCollection(
List(A("abc", 0.2022f, SomeEnum.BAR), B(-22, SomeCaseClass(-0.2022f, SomeEnum.FOO)))
).executeAndCollect(10)
assertEquals(2, result.size)
}
}
object AdtSerializerTestAssets {
object SomeEnum extends Enumeration {
type SomeEnum = Value
val FOO: SomeEnum = Value("FOO")
val BAR: SomeEnum = Value("BAR")
}
sealed trait Event extends Product with Serializable
object Event{
case class A(someStr: String, someFloat: Float, someEnum: SomeEnum) extends Event
case class B(someLong: Long, someCaseClass: SomeCaseClass) extends Event
case class SomeCaseClass(someFloat: Float, someEnum: SomeEnum)
implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation
}
implicit final class FlinkEnvOps(private val env: StreamExecutionEnvironment) extends AnyVal {
import scala.collection.JavaConverters._
def fromScalaCollection[A](data: Seq[A])(implicit typeInformation: TypeInformation[A]): DataStreamSource[A] =
env.fromCollection(data.asJava, typeInformation)
}
}
Output:
[Error].../AdtSerializerTest.scala:49: could not find implicit value for parameter typeInformation: org.apache.flink.api.common.typeinfo.TypeInformation[...AdtSerializerTestAssets.Event with Serializable]
[Error]...misc/AdtSerializerTest.scala:71: magnolia: could not find TypeInformation.Typeclass for type ....AdtSerializerTestAssets.SomeEnum.SomeEnum
in parameter 'someEnum' of product type....AdtSerializerTestAssets.Event.A
in coproduct type ...AdtSerializerTestAssets.Event
Thank you.
The text was updated successfully, but these errors were encountered:
Edit: I tried and Scala 3 enums don't work out of the box in Flink, it seems like the Java enum compat is not enough. However it looks like this lib has support for Scala 3 enums?
Hi, I'm testing out this library, and it seems that scala
Enumeration
types aren't supported in this library.Test code:
Output:
Thank you.
The text was updated successfully, but these errors were encountered: