Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Couldn't find process instance" while using async communication #51

Open
ashulinskiy opened this issue Apr 25, 2019 · 2 comments
Open

Comments

@ashulinskiy
Copy link

ashulinskiy commented Apr 25, 2019

Hi folks,

Another issue I could use some assistance with please.

I have a BPMN which uses the Camunda-Camel library to communicate to a third party service through Kafka, i.e.:

  1. A BPMN Send Task of a BPMN process sends a message to a Camel route.
    That task is the fist one in the BPMN process which may be important.
  2. The Camel route forwards the message to the "FromCamunda"Kafka topic.
  3. A third party service reads from the "FromCamunda" Kafka topic.
  4. The third party service does some processing and sends the reply to the "ToCamunda" Kafka topic.
  5. Another camel route configured on the same Camunda server as the one in steps 1-2 reads the message from the "ToCamunda" Kafka topic.
  6. The route forwards the message to the same Camunda BPMN process which sent the outgoing message in steps 1-2.
    The BPMN Process Instance ID is used for the correlation.

The routing sometime breaks with the following error:

2019-04-25 11:56:33,696 ERROR o.a.c.p.DefaultErrorHandler [-1) thread #1 - KafkaConsumer[toCamunda]] -> Failed delivery for (MessageId: ID-LAPTOP-9OG51EMC-1556205751527-0-4 on ExchangeId: ID-LAPTOP-9OG51EMC-1556205751527-0-3). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[asyncResponse     ] [asyncResponse     ] [kafka://localhost:9092?autoOffsetReset=earliest&brokers=localhost%3A9092&consu] [       100]
[asyncResponse     ] [unmarshal1        ] [unmarshal[org.apache.camel.model.dataformat.JsonDataFormat@473c9784]          ] [        22]
[asyncResponse     ] [process2          ] [Processor@0x3a92a57c                                                          ] [         0]
[asyncResponse     ] [to2               ] [camunda-bpm:message?messageName=camel.answer&copyBodyAsVariable=aggregateData ] [        75]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
	at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
	at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
2019-04-25 11:56:33,698 WARN  o.a.c.c.k.KafkaConsumer [-1) thread #1 - KafkaConsumer[toCamunda]] -> Error during processing. Exchange[ID-LAPTOP-9OG51EMC-1556205751527-0-3]. Caused by: [java.lang.RuntimeException - Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer']
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
	at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
	at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

The BPMN:

<?xml version="1.0" encoding="UTF-8"?> <bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="Definitions_0wxf2ek" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="2.2.4"> <bpmn:message id="Message_1buba0w" name="camel.answer" /> <bpmn:message id="Message_19viyxu" name="camel.answer" /> <bpmn:process id="camel_asynch" name="Camel async" isExecutable="true"> <bpmn:endEvent id="EndEvent_1egevrf"> <bpmn:incoming>SequenceFlow_0aodgcz</bpmn:incoming> </bpmn:endEvent> <bpmn:sendTask id="SendTask_1ouqvcf" name="Call async service" camunda:expression="#{camel.sendTo(&#39;direct:asyncService&#39;)}" camunda:resultVariable="aggregateData"> <bpmn:incoming>SequenceFlow_1yw1ho6</bpmn:incoming> <bpmn:outgoing>SequenceFlow_1i1ujc8</bpmn:outgoing> </bpmn:sendTask> <bpmn:sequenceFlow id="SequenceFlow_0aodgcz" sourceRef="IntermediateCatchEvent_133vikc" targetRef="EndEvent_1egevrf" /> <bpmn:sequenceFlow id="SequenceFlow_1yw1ho6" sourceRef="StartEvent_1cmiw6j" targetRef="SendTask_1ouqvcf" /> <bpmn:intermediateCatchEvent id="IntermediateCatchEvent_133vikc" name="message &#39;answer&#39; received"> <bpmn:incoming>SequenceFlow_1i1ujc8</bpmn:incoming> <bpmn:outgoing>SequenceFlow_0aodgcz</bpmn:outgoing> <bpmn:messageEventDefinition id="MessageEventDefinition_0anjhqa" messageRef="Message_0u86xqd" /> </bpmn:intermediateCatchEvent> <bpmn:sequenceFlow id="SequenceFlow_1i1ujc8" sourceRef="SendTask_1ouqvcf" targetRef="IntermediateCatchEvent_133vikc" /> <bpmn:startEvent id="StartEvent_1cmiw6j"> <bpmn:outgoing>SequenceFlow_1yw1ho6</bpmn:outgoing> </bpmn:startEvent> </bpmn:process> <bpmn:message id="Message_0u86xqd" name="camel.answer" /> <bpmndi:BPMNDiagram id="BPMNDiagram_1"> <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="camel_asynch"> <bpmndi:BPMNShape id="EndEvent_1egevrf_di" bpmnElement="EndEvent_1egevrf"> <dc:Bounds x="848" y="88" width="36" height="36" /> </bpmndi:BPMNShape> <bpmndi:BPMNShape id="SendTask_1ouqvcf_di" bpmnElement="SendTask_1ouqvcf"> <dc:Bounds x="380" y="66" width="100" height="80" /> </bpmndi:BPMNShape> <bpmndi:BPMNEdge id="SequenceFlow_0aodgcz_di" bpmnElement="SequenceFlow_0aodgcz"> <di:waypoint x="748" y="106" /> <di:waypoint x="848" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNEdge id="SequenceFlow_1yw1ho6_di" bpmnElement="SequenceFlow_1yw1ho6"> <di:waypoint x="101" y="106" /> <di:waypoint x="380" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNShape id="IntermediateCatchEvent_133vikc_di" bpmnElement="IntermediateCatchEvent_133vikc"> <dc:Bounds x="712" y="88" width="36" height="36" /> <bpmndi:BPMNLabel> <dc:Bounds x="687" y="131" width="88" height="27" /> </bpmndi:BPMNLabel> </bpmndi:BPMNShape> <bpmndi:BPMNEdge id="SequenceFlow_1i1ujc8_di" bpmnElement="SequenceFlow_1i1ujc8"> <di:waypoint x="480" y="106" /> <di:waypoint x="712" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNShape id="StartEvent_1cmiw6j_di" bpmnElement="StartEvent_1cmiw6j"> <dc:Bounds x="65" y="88" width="36" height="36" /> </bpmndi:BPMNShape> </bpmndi:BPMNPlane> </bpmndi:BPMNDiagram> </bpmn:definitions>

The Camel routes:

      from("direct://asyncService")
                .routeId("asyncService")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        Map inMap = (Map)exchange.getIn().getBody();
                        Map<String, String> kafkaOutParams = new HashMap<>();
                        kafkaOutParams.put("aggregateSchemaId", inMap.get("aggregateSchemaId?").toString());
                        kafkaOutParams.put("aggregateInstanceId", inMap.get("aggregateInstanceId?").toString());
                        exchange.getOut().setBody(kafkaOutParams, Map.class);
                        exchange.getOut().setHeader(KafkaConstants.KEY,
                            exchange.getProperties().get(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID));
                    }
                })
                .marshal().json(JsonLibrary.Jackson, Map.class)
                .convertBodyTo(String.class)
                .to("kafka:localhost:9092?topic=fromCamunda&brokers=localhost:9092");

        from("kafka:localhost:9092?topic=toCamunda&groupId=group_id&autoOffsetReset=earliest&consumersCount=1&brokers=localhost:9092")
            .routeId("asyncResponse")
            .unmarshal().json(JsonLibrary.Jackson, Map.class)
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    exchange.setProperty(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID,
                        exchange.getIn().getHeader(KafkaConstants.KEY));
                    Map<String,Object> result = new HashMap<>();
                    result.put("aggregateData", exchange.getIn().getBody());
                    exchange.getIn().setBody(result, Map.class);
                }
            })
            .to("camunda-bpm:message?messageName=camel.answer&copyBodyAsVariable=aggregateData");

As far as I understand there's a race condition between the thread in the Camunda process that sends the outgoing Kafka message and the thread that reads the Kafka message and uses the Process Instance ID to find the process to forward the message to.
I.e. when the correlation is done in the second thread the process hasn'd been committed to the DB in the first thread.

  • Does this theory sound right?

  • If it doesn't what else could be the problem

  • Any advice how to fix the issue?

Any help is greatly appreciated!

Thanks,
Andrey.

@ashulinskiy ashulinskiy changed the title "Couldn't find process instance" whe using async communication "Couldn't find process instance" while using async communication Apr 26, 2019
@umamaheswaran3003
Copy link

May I know the versions of camel, camunda, spring you are using. I am using current versions of all the dependencies, facing issue while server starts up. Please help!!!

@ashulinsky-korio
Copy link

It's been a while but I believe I have found the branch where that investigation happened.

Here's the POM with the versions we used back then, has the versions.

We ended up not using this approach so no further insight sorry.

`

4.0.0

<groupId>DSABackend</groupId>
<artifactId>DSABackend</artifactId>
<version>1.0</version>

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
</properties>


<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.1.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.camunda.bpm.springboot</groupId>
        <artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- TODO The OTB Camunda REST API is left in the project for convenience. MUST be removed before we go production!!! -->
    <dependency>
        <groupId>org.camunda.bpm.springboot</groupId>
        <artifactId>camunda-bpm-spring-boot-starter-rest</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-rest-webmvc -->
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-rest-webmvc</artifactId>
        <version>3.1.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
    </dependency>

    <!-- https://stackoverflow.com/questions/43574426/how-to-resolve-java-lang-noclassdeffounderror-javax-xml-bind-jaxbexception-in-j -->
    <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>2.2.11</version>
    </dependency>
    <dependency>
        <groupId>com.sun.xml.bind</groupId>
        <artifactId>jaxb-core</artifactId>
        <version>2.2.11</version>
    </dependency>
    <dependency>
        <groupId>com.sun.xml.bind</groupId>
        <artifactId>jaxb-impl</artifactId>
        <version>2.2.11</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>activation</artifactId>
        <version>1.1.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-collections4</artifactId>
        <version>4.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/ch.qos.logback/ -->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.2.3</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/ma.glasnost.orika/orika-core -->
    <dependency>
        <groupId>ma.glasnost.orika</groupId>
        <artifactId>orika-core</artifactId>
        <version>1.5.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.arangodb/arangodb-java-driver -->
    <dependency>
        <groupId>com.arangodb</groupId>
        <artifactId>arangodb-java-driver</artifactId>
        <version>5.0.4</version>
    </dependency>
    <dependency>
        <groupId>com.arangodb</groupId>
        <artifactId>arangodb-spring-data</artifactId>
        <version>3.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro-tools -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-tools</artifactId>
        <version>1.8.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.5.RELEASE</version>
    </dependency>

    <!-- camunda BPM Apache Camel Integration -->
    <dependency>
        <groupId>org.camunda.bpm.extension.camel</groupId>
        <artifactId>camunda-bpm-camel-cdi</artifactId>
        <version>0.6</version>
        <exclusions>
            <exclusion>
                <artifactId>jaxb-impl</artifactId>
                <groupId>com.sun.xml.bind</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.camunda.bpm/camunda-engine -->
    <dependency>
        <groupId>org.camunda.bpm</groupId>
        <artifactId>camunda-engine</artifactId>
        <version>7.10.0</version>
    </dependency>

    <!-- Camel Components -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>2.23.2</version>
    </dependency>
    <!--<dependency>-->
    <!--<groupId>org.apache.camel</groupId>-->
    <!--<artifactId>camel-spring-boot</artifactId>-->
    <!--<version>2.23.2</version>-->
    <!--</dependency>-->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-spring-boot-starter</artifactId>
        <version>2.23.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-cdi</artifactId>
        <version>2.23.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-groovy</artifactId>
        <version>2.23.2</version>
        <exclusions>
            <exclusion>
                <artifactId>jaxb-impl</artifactId>
                <groupId>com.sun.xml.bind</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-http</artifactId>
        <version>2.23.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jackson -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-jackson</artifactId>
        <version>2.23.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-xstream -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-xstream</artifactId>
        <version>2.23.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-kafka</artifactId>
        <version>2.23.2</version>
        <!-- use the same version as your Camel core version -->
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-ognl</artifactId>
        <version>2.23.2</version>
    </dependency>

    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.2.5</version>
    </dependency>

    <!-- TEST DEPENDENCIES -->

    <!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
    <dependency>
        <groupId>org.mockito</groupId>
        <artifactId>mockito-core</artifactId>
        <version>2.23.4</version>
        <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.powermock/powermock-core -->
    <dependency>
        <groupId>org.powermock</groupId>
        <artifactId>powermock-core</artifactId>
        <version>2.0.0</version>
        <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.powermock/powermock-module-junit4 -->
    <dependency>
        <groupId>org.powermock</groupId>
        <artifactId>powermock-module-junit4</artifactId>
        <version>2.0.0</version>
        <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.powermock/powermock-api-mockito2 -->
    <dependency>
        <groupId>org.powermock</groupId>
        <artifactId>powermock-api-mockito2</artifactId>
        <version>2.0.0</version>
        <scope>test</scope>
    </dependency>


</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <layout>ZIP</layout>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants