Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A possible cause of duplicated case objects bug #187

Open
arnor2000 opened this issue Dec 19, 2024 · 5 comments
Open

A possible cause of duplicated case objects bug #187

arnor2000 opened this issue Dec 19, 2024 · 5 comments

Comments

@arnor2000
Copy link

arnor2000 commented Dec 19, 2024

Hi @novakov-alexey,
I think I have pinpointed a major cause of the scala.MatchError: None (of class scala.None$) bug, it actually touches all case objects by having several instances of what should be singleton. Other users have experienced this bug and may be related to #148, #106.

Another manifestation on a common Scala case object is on Nil:

java.util.NoSuchElementException: head of empty list
	at scala.collection.immutable.Nil$.head(List.scala:469)
	at scala.collection.immutable.Nil$.head(List.scala:466)

The problem appears only when we are removing flink-scala_2.12-*.jar from the Flink distribution (lib/flink-scala_2.12-*.jar) as Flink recommends it (ie: https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/). The reason is this jar contains a lot of custom Kryo serializers to handle Scala gracefully, notably FlinkScalaKryoInstantiator which use Chill and allows Kryo to handle singleton objects and other Scala specificities.

Now, the question is: why Kryo continues to be involved in serialization despite the fact we shouldn't use it anymore thanks to flink-scala-api "no silent fallback to Kryo" feature?

Because of this constructor: org.apache.flink.api.common.state.ValueStateDescriptor.ValueStateDescriptor(java.lang.String, java.lang.Class<T>), same for MapStateDescriptor, etc.
Screenshot-ValueStateDescriptor

It is absolutely not documented these constructors with a Class param is the door wide open to Kryo. And flink-scala-api can do nothing about it because it's not part of Scala API.

At least it should be documented in Migration section to switch from *StateDescription constructors with Class to TypeInformation param. I hope it can prevent weeks of headaches (in my case) to other users!

Thanks!

@arnor2000 arnor2000 changed the title Cause of duplicated case objects bug A possible cause of duplicated case objects bug Dec 20, 2024
novakov-alexey added a commit that referenced this issue Dec 20, 2024
* Documents state migration

* Update README.md

Co-authored-by: Arnaud <[email protected]>

* Validates snippets with mdoc

* Improves readiness

---------

Co-authored-by: novakov-alexey <[email protected]>
@arnaud-daroussin
Copy link
Contributor

To give more details, I discovered the root cause of my duplicated case object problem by logging a stacktrace in the body of one of my case object:

java.lang.Exception: this exception is logged at TotalCount init
	at com.xxx.aggregations.AggregationFunction$TotalCount$.<init>(AggregationFunction.scala:96)
	at com.xxx.aggregations.AggregationFunction$TotalCount$ConstructorAccess.newInstance(Unknown Source)
	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy$1.newInstance(Kryo.java:1193)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.copy(DefaultArraySerializers.java:390)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.copy(DefaultArraySerializers.java:289)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:298)
	at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:112)
	at org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:70)
	at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:88)
	at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:41)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:96)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:40)
	at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:185)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:720)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:352)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1314)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.base/java.lang.Thread.run(Thread.java:829)

I was surprised to see Kryo was involved in state serialization and was happily instantiating my case object when writing checkpoints.

It was due to usage of:

  val BroadcastStateDescriptor: MapStateDescriptor[Long, ApiMessage] =
    new MapStateDescriptor("received-apiMessages", classOf[Long], classOf[ApiMessage])

instead of:

  val BroadcastStateDescriptor: MapStateDescriptor[Long, ApiMessage] =
    new MapStateDescriptor("received-apiMessages",
      implicitly[TypeInformation[Long]],
      implicitly[TypeInformation[ApiMessage]])

For years, me and my colleagues didn't know Kryo was used for state serialization because of these constructors with Class params, this information should be widely shared!

Also note that we are using a HashMapStateBackend, this behavior may be different with other state backends.

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Dec 21, 2024

Hi @arnor2000

Thanks a lot for digging into it deeper. This is really good finding. I think this will also help other users of Scala API, indeed.

As I understood the confusion comes when flink-scala_2.12-*.jar is removed from a Flink job, but user code still uses Java API focused constructors instead of implicitly[TypeInformation[T]], which may lead to Kryo usage, like in case of ValueStateDescriptor(...).

Although, you have correctly highlighted that it promises to not fallback silently to Kryo, it can still happen even for Scala data classes when user code uses Java-oriented constructors.

I see the following actions to improve the situation:

  1. Try to prevent Kryo usage even from Java-oriented constructors. Not sure if it feasible at all. Perhaps some Java Reflection needs to be used already in runtime to report some meaningful error message.
  2. Alternatively, update README to say that this library does not handle Kryo usage when user code uses Java-oriented constructors like the one you mentioned.

What do you think?

@arnaud-daroussin
Copy link
Contributor

arnaud-daroussin commented Dec 23, 2024

Exactly!

In our case it was an unintended usage of Kryo, but I suppose duplication of case objects will also occur with explicit usage of Kryo with TypeInformation.of(classOf[Option[String]], it can be really error-prone. It's worth to add a warning of Kryo usage in case of flink-scala_2.12-*.jar removal.

There is several borderline usage of Kryo in examples like in FraudDetector: TypeInformation.of(classOf[Alert]) and TypeInformation.of(classOf[KeyedFraudState]). If these case classes were using an Option somewhere, any None value will be candidate for duplication.

About the actions you propose, the first one is the best but seems hard to achieve. I was thinking to extends *StateDescriptor with our own *ScalaStateDescriptor to hide constructors with Class params, but the only place to enforce it with current Scala API is org.apache.flinkx.api.DataStream#broadcast(broadcastStateDescriptors: MapStateDescriptor[_, _]*). Efficiently enforce it would require to provide our own RuntimeContext so it looks like to be a burden. Or maybe registering a special TypeInfoFactory on Product using TypeExtractor.registerFactory() as "Java-oriented" StateDescriptor constructors are using it, but I don't know the implications...

Clearly document the consequences of removing flink-scala_2.12-*.jar on Kryo could be a good first step for sure.

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Dec 24, 2024

This library never claimed that it can prevent usage of Kryo if user code does not use its serlializers via:

import org.apache.flinkx.api.serializers._
// and then
implicitly[TypeInformation[Long]]

Agree?

This is the example when a user does not use proper Scala serializers:

new MapStateDescriptor("received-apiMessages", classOf[Long], classOf[ApiMessage])

MapStateDescriptor is Java class, not Scala class.

So what you are saying is a user will be confused, because README says there is no silent fallback to Kryo, but actually there is. Since user opted-out from using proper serialisers, so now he is on its own. This is a user fault.

Of course we can say that this library could do better or its documentation could be better. So first we need to clarify this moment in docs.

Secondly, we need to describe the issue with "duplication of case objects" and why it happens.

Thirdly, we could try to improve library code to intercept case object serialization and then either fail-fast (3.1) or turn entire seralization process to use proper serializers (3.2). I am not sure if this 3rd point is doable including its options 3.1 and 3.2.

@arnaud-daroussin this section https://github.com/flink-extended/flink-scala-api?tab=readme-ov-file#using-a-pojo-only-flink-serialization-framework describes about fallback to Kryo, I missed this point as well.

@arnaud-daroussin
Copy link
Contributor

Hi @novakov-alexey,

I think the documentation work is done!

About the code improvement, the ideal would be to handle typing with Class.

One solution could be to register all types brought by flink-scala-api in TypeExtractor.registerFactory() (available only from Flink 1.19).

As for the TypeInformations generated for case classes, the call to registerFactory() could be made at the end of the code generated by the macro because all the necessary information is there. For example, here is the decompiled Java code of the constructor of object ApiMessage:

TypeInformation apimessageTypeclass$macro$30 = var35.join(var38, var40, ((TypeTags)$u).TypeTag().apply((Mirror)$m, new ApiMessage$$typecreator29$1()));
this.info = apimessageTypeclass$macro$30;
this.bitmap$init$0 = (byte)(this.bitmap$init$0 | 32);

Although I have no idea how to do this with Magnolia.

Another possibility would be to use the cache (org.apache.flinkx.api.LowPrioImplicits#cache) filled at compilation by the generation of case classes, and generate with another macro the code that will do the registration at runtime. However, I have no idea how to ensure that this last macro is generated after all the macros that generated the case classes.

In short, I'm trying to propose ideas, but solution doesn't seem obvious!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants