Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deletion operations are not tracked #6

Open
tnk4on opened this issue Jun 21, 2023 · 2 comments
Open

Deletion operations are not tracked #6

tnk4on opened this issue Jun 21, 2023 · 2 comments

Comments

@tnk4on
Copy link

tnk4on commented Jun 21, 2023

I ran the following commands as described in the doc.

DELETE FROM public.product where product_id = 7777;

However, the search-service continues to remain in the search results.

Therefore, items 5 and 6 of the Enhanced search capabilities for products section do not work.

5. Delete the product directly on the retail database;
6. Confirm that the product no longer shows up in the search service.

There has been a related Issue for some time, but it still seems to be unfixed.
solution-pattern-cdc/elastic-connector#1

@tnk4on
Copy link
Author

tnk4on commented Jun 21, 2023

Kafdrop Topic Messages:

Offset: 1000   Key: {"product_id":7777}   Timestamp: 2023-06-21 11:58:13.777 Headers: empty
 
{
   "before": null,
   "after": {
      "product_id": 7777,
      "name": "Kopi luwak",
      "description": "Kopi luwak is a coffee that consists of partially digested coffee cherries, which have been eaten and defecated by the Asian palm civet (Paradoxurus hermaphroditus). It is produced mainly on the Indonesian islands of Sumatra, Java, Bali, Sulawesi, and in East Timor.",
      "price": "20.00"
   },
   "source": {
      "version": "1.9.7.Final",
      "connector": "postgresql",
      "name": "retail.updates",
      "ts_ms": 1687348693163,
      "snapshot": "false",
      "db": "retail",
      "sequence": "[null,\"24175520\"]",
      "schema": "public",
      "table": "product",
      "txId": 1629,
      "lsn": 24175520,
      "xmin": null
   },
   "op": "c",
   "ts_ms": 1687348693499,
   "transaction": null
}
Offset: 1001   Key: {"product_id":7777}   Timestamp: 2023-06-21 11:59:26.930 Headers: empty
 
{
   "before": {
      "product_id": 7777,
      "name": "",
      "description": null,
      "price": null
   },
   "after": null,
   "source": {
      "version": "1.9.7.Final",
      "connector": "postgresql",
      "name": "retail.updates",
      "ts_ms": 1687348766627,
      "snapshot": "false",
      "db": "retail",
      "sequence": "[\"24189208\",\"24189264\"]",
      "schema": "public",
      "table": "product",
      "txId": 1630,
      "lsn": 24189264,
      "xmin": null
   },
   "op": "d",
   "ts_ms": 1687348766841,
   "transaction": null
}
Offset: 1002   Key: {"product_id":7777}   Timestamp: 2023-06-21 11:59:26.930 Headers: empty
 
empty

@tnk4on
Copy link
Author

tnk4on commented Jun 21, 2023

elastic-connector pod's log

2023-06-21 11:59:26,943 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #1 - KafkaConsumer[retail.updates.public.product]) Failed delivery for (MessageId: 48B50DDF94E254A-0000000000000000 on ExchangeId: 48B50DDF94E254A-0000000000000000). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             Processor                                          Elapsed (ms)
                                         kafka2Elastic/kafka2Elastic    from[kafka://retail.updates.public.product?autoOff      2020474
	...
                                         kafka2Elastic/process1         Processor@0x76b224cd                                          0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------: java.lang.NullPointerException
	at org.acme.retail.ProductRoute.lambda$configure$0(ProductRoute.java:31)
	at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:175)
	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
	at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:124)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:77)
	at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:318)
	at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:158)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

2023-06-21 11:59:26,951 WARN  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #1 - KafkaConsumer[retail.updates.public.product]) Error during processing. Exchange[48B50DDF94E254A-0000000000000000]. Caused by: [java.lang.NullPointerException - null]: java.lang.NullPointerException
	at org.acme.retail.ProductRoute.lambda$configure$0(ProductRoute.java:31)
	at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:175)
	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
	at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:124)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:77)
	at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:318)
	at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:158)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

2023-06-21 11:59:26,961 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #1 - KafkaConsumer[retail.updates.public.product]) Failed delivery for (MessageId: 48B50DDF94E254A-0000000000000001 on ExchangeId: 48B50DDF94E254A-0000000000000001). Exhausted after delivery attempt: 1 caught: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream on: Message. Caused by: No type converter available to convert from type: null to the required type: java.io.InputStream. Exchange[]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: null to the required type: java.io.InputStream]

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             Processor                                          Elapsed (ms)
                                         kafka2Elastic/kafka2Elastic    from[kafka://retail.updates.public.product?autoOff      2020493
	...
                                         kafka2Elastic/unmarshal1       unmarshal[org.apache.camel.model.dataformat.JsonDa            0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream on: Message. Caused by: No type converter available to convert from type: null to the required type: java.io.InputStream. Exchange[]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: null to the required type: java.io.InputStream]
	at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:125)
	at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:70)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:175)
	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
	at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:124)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:77)
	at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:318)
	at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:158)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: null to the required type: java.io.InputStream
	at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:274)
	at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123)
	... 18 more

2023-06-21 11:59:26,961 WARN  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #1 - KafkaConsumer[retail.updates.public.product]) Error during processing. Exchange[48B50DDF94E254A-0000000000000001]. Caused by: [org.apache.camel.InvalidPayloadException - No body available of type: java.io.InputStream on: Message. Caused by: No type converter available to convert from type: null to the required type: java.io.InputStream. Exchange[]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: null to the required type: java.io.InputStream]]: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream on: Message. Caused by: No type converter available to convert from type: null to the required type: java.io.InputStream. Exchange[]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: null to the required type: java.io.InputStream]
	at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:125)
	at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:70)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:175)
	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
	at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:124)
	at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:77)
	at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:318)
	at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:158)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: null to the required type: java.io.InputStream
	at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:274)
	at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123)
	... 18 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant