-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A possible cause of duplicated case objects bug #187
Comments
* Documents state migration * Update README.md Co-authored-by: Arnaud <[email protected]> * Validates snippets with mdoc * Improves readiness --------- Co-authored-by: novakov-alexey <[email protected]>
To give more details, I discovered the root cause of my duplicated case object problem by logging a stacktrace in the body of one of my case object:
I was surprised to see Kryo was involved in state serialization and was happily instantiating my case object when writing checkpoints. It was due to usage of: val BroadcastStateDescriptor: MapStateDescriptor[Long, ApiMessage] =
new MapStateDescriptor("received-apiMessages", classOf[Long], classOf[ApiMessage]) instead of: val BroadcastStateDescriptor: MapStateDescriptor[Long, ApiMessage] =
new MapStateDescriptor("received-apiMessages",
implicitly[TypeInformation[Long]],
implicitly[TypeInformation[ApiMessage]]) For years, me and my colleagues didn't know Kryo was used for state serialization because of these constructors with Also note that we are using a |
Hi @arnor2000 Thanks a lot for digging into it deeper. This is really good finding. I think this will also help other users of Scala API, indeed. As I understood the confusion comes when Although, you have correctly highlighted that it promises to not fallback silently to Kryo, it can still happen even for Scala data classes when user code uses Java-oriented constructors. I see the following actions to improve the situation:
What do you think? |
Exactly! In our case it was an unintended usage of Kryo, but I suppose duplication of case objects will also occur with explicit usage of Kryo with There is several borderline usage of Kryo in examples like in About the actions you propose, the first one is the best but seems hard to achieve. I was thinking to extends Clearly document the consequences of removing |
This library never claimed that it can prevent usage of Kryo if user code does not use its serlializers via: import org.apache.flinkx.api.serializers._
// and then
implicitly[TypeInformation[Long]] Agree? This is the example when a user does not use proper Scala serializers: new MapStateDescriptor("received-apiMessages", classOf[Long], classOf[ApiMessage]) MapStateDescriptor is Java class, not Scala class. So what you are saying is a user will be confused, because README says there is no silent fallback to Kryo, but actually there is. Since user opted-out from using proper serialisers, so now he is on its own. This is a user fault. Of course we can say that this library could do better or its documentation could be better. So first we need to clarify this moment in docs. Secondly, we need to describe the issue with "duplication of case objects" and why it happens. Thirdly, we could try to improve library code to intercept case object serialization and then either fail-fast (3.1) or turn entire seralization process to use proper serializers (3.2). I am not sure if this 3rd point is doable including its options 3.1 and 3.2. @arnaud-daroussin this section https://github.com/flink-extended/flink-scala-api?tab=readme-ov-file#using-a-pojo-only-flink-serialization-framework describes about fallback to Kryo, I missed this point as well. |
Hi @novakov-alexey, I think the documentation work is done! About the code improvement, the ideal would be to handle typing with One solution could be to register all types brought by flink-scala-api in As for the TypeInformations generated for case classes, the call to TypeInformation apimessageTypeclass$macro$30 = var35.join(var38, var40, ((TypeTags)$u).TypeTag().apply((Mirror)$m, new ApiMessage$$typecreator29$1()));
this.info = apimessageTypeclass$macro$30;
this.bitmap$init$0 = (byte)(this.bitmap$init$0 | 32); Although I have no idea how to do this with Magnolia. Another possibility would be to use the cache ( In short, I'm trying to propose ideas, but solution doesn't seem obvious! |
Hi @novakov-alexey,
I think I have pinpointed a major cause of the
scala.MatchError: None (of class scala.None$)
bug, it actually touches all case objects by having several instances of what should be singleton. Other users have experienced this bug and may be related to #148, #106.Another manifestation on a common Scala case object is on
Nil
:The problem appears only when we are removing
flink-scala_2.12-*.jar
from the Flink distribution (lib/flink-scala_2.12-*.jar
) as Flink recommends it (ie: https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/). The reason is this jar contains a lot of custom Kryo serializers to handle Scala gracefully, notably FlinkScalaKryoInstantiator which use Chill and allows Kryo to handle singletonobject
s and other Scala specificities.Now, the question is: why Kryo continues to be involved in serialization despite the fact we shouldn't use it anymore thanks to flink-scala-api "no silent fallback to Kryo" feature?
Because of this constructor:
org.apache.flink.api.common.state.ValueStateDescriptor.ValueStateDescriptor(java.lang.String, java.lang.Class<T>)
, same for MapStateDescriptor, etc.It is absolutely not documented these constructors with a
Class
param is the door wide open to Kryo. And flink-scala-api can do nothing about it because it's not part of Scala API.At least it should be documented in Migration section to switch from *StateDescription constructors with
Class
toTypeInformation
param. I hope it can prevent weeks of headaches (in my case) to other users!Thanks!
The text was updated successfully, but these errors were encountered: