Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Enhance Logging for Sink Error (java.lang.Exception: Sink Error) to Include More Context #23934

Open
1 of 2 tasks
mukesh154 opened this issue Feb 6, 2025 · 0 comments
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Comments

@mukesh154
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

I created a Debezium PostgreSQL source connector, and when an entry in one of the tables exceeded 5MB, the source threw the following error with "flush failed." However, the error's cause is only displayed as "Sink Error," which doesn't provide sufficient context for troubleshooting. Although there is a preceding log indicating that the message size exceeds the maximum allowed size, the overall log lacks detailed information.

Currently, the log includes a generic message:
Caused by: java.lang.Exception: Sink Error
This message provides minimal context and does not help identify the root cause of the "Sink Error." More detailed logging would significantly improve the debugging process. Including information such as the size of the message that caused the failure, details about the associated producer or consumer, and the specific part of the sink process where the error occurred would be invaluable for faster and more effective troubleshooting.

Here is the complete stacktrace:

[org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Message size is bigger than 5242880 bytes]
2025-02-04T05:05:26,014+0000 [public/default/postgres-dbz-0] ERROR org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource - execution exception while get flushFuture
java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) ~[?:?]
	at org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource.read(AbstractKafkaConnectSource.java:191) ~[pulsar-io-kafka-connect-adaptor-2.10.5.5.jar:2.10.5.5]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:491) ~[?:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:307) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.Exception: Sink Error
	at org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource$AbstractKafkaSourceRecord.fail(AbstractKafkaConnectSource.java:326) ~[pulsar-io-kafka-connect-adaptor-2.10.5.5.jar:2.10.5.5]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getPublishErrorHandler$1(PulsarSink.java:191) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) ~[?:?]
	at org.apache.pulsar.client.impl.ProducerImpl$1.sendComplete(ProducerImpl.java:350) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
	at org.apache.pulsar.client.impl.BatchMessageContainerImpl.discard(BatchMessageContainerImpl.java:193) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
	at org.apache.pulsar.client.impl.BatchMessageContainerImpl.createOpSendMsg(BatchMessageContainerImpl.java:214) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
	at org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:2034) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$14(ProducerImpl.java:1654) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
	at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) ~[pulsar-common-2.10.5.5.jar:2.10.5.5]
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:159) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) ~[netty-transport-classes-epoll-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
	... 1 more

Solution

Enhance the Sink Error log to include additional information such as if the error is related to specific conditions like exceeding message size or connection issues, mention that directly in the log message. For example, if the failure was caused by a connection timeout or message size violation, log it clearly.

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@mukesh154 mukesh154 added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Feb 6, 2025
@mukesh154 mukesh154 changed the title Enhance Logging for Sink Error (java.lang.Exception: Sink Error) to Include More Context [Enhancement] Enhance Logging for Sink Error (java.lang.Exception: Sink Error) to Include More Context Feb 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

No branches or pull requests

1 participant