You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I searched in the issues and found nothing similar.
Paimon version
0.9.0
Compute Engine
Flink 1.18.1
Minimal reproduce step
You can replicate this in local environment using any schema. You can use flink paimon action jar to sink Kafka cdc data into paimon. An example:
Schema : {'update_time' : DATE}
Produce data like : {'update_time': -2954}
This will cause the Flink ingestion job to run into an infinite loop waiting for the schema to update.
Error example :
2024-09-23 05:32:12,439 [] INFO org.apache.paimon.flink.sink.cdc.CdcRecordUtils [] - Failed to convert value [{"update_time": -2954}] to type ROW<update_time DATE 'update_time'>. Waiting for schema update.
java.lang.RuntimeException: Failed to parse Json String {"update_time": -2954}
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:192) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromCdcValueString(TypeUtils.java:96) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow(CdcRecordUtils.java:105) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.processElement(CdcRecordStoreWriteOperator.java:80) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) [flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.0.jar:1.18.0]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: Failed to parse Json String {"update_time": -2954}
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:259) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:177) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
... 16 more
Caused by: java.time.DateTimeException: For input string: '-2954'.
at org.apache.paimon.utils.BinaryStringUtils.toDate(BinaryStringUtils.java:289) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:157) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:243) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:177) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
... 16 more
What doesn't meet your expectations?
This caused the job to run into an infinite loop and fail checkpointing. I expected it to loudly fail with the exception that message it got is corrupted instead of silently running into an infinite loop. All the exceptions in Flink UI pointed to checkpoint timeout.
Anything else?
No response
Are you willing to submit a PR?
I'm willing to submit a PR!
The text was updated successfully, but these errors were encountered:
A possible way to tackle this can be to try a configurable no of time before bubbling the exception. We can have a config cdc.retry-num-times with default as 3, if the conversion to genericRow still fails, we can fail the job by bubbling up the exception.
@JingsongLi alternatively if it is a corrupt record, it is going to keep on failing unless that record is somehow skipped. Do you think we should also provide a functionality to skip the record as well with a default for skipping as false?
Search before asking
Paimon version
0.9.0
Compute Engine
Flink 1.18.1
Minimal reproduce step
You can replicate this in local environment using any schema. You can use flink paimon action jar to sink Kafka cdc data into paimon. An example:
This will cause the Flink ingestion job to run into an infinite loop waiting for the schema to update.
Error example :
What doesn't meet your expectations?
This caused the job to run into an infinite loop and fail checkpointing. I expected it to loudly fail with the exception that message it got is corrupted instead of silently running into an infinite loop. All the exceptions in Flink UI pointed to checkpoint timeout.
Anything else?
No response
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: