Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting java.util.NoSuchElementException sporadically! #30

Open
theGreatHeisenberg opened this issue Oct 7, 2016 · 1 comment
Open

Comments

@theGreatHeisenberg
Copy link

I am getting the following error while calling ack() on OutputCollector.

Can anybody help me to find a cause of this exception?

2016-10-07 08:46:46.856 [Thread-19-JMS_QUEUE_SPOUT] WARN  Message failed: org.apache.storm.jms.spout.JmsMessageID@77eb
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR  Async loop died!
java.lang.RuntimeException: java.util.NoSuchElementException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:619) [na:1.6.0_21]
Caused by: java.util.NoSuchElementException: null
        at java.util.TreeMap.key(TreeMap.java:1206) ~[na:1.6.0_21]
        at java.util.TreeMap.firstKey(TreeMap.java:267) ~[na:1.6.0_21]
        at java.util.TreeSet.first(TreeSet.java:377) ~[na:1.6.0_21]
        at org.apache.storm.jms.spout.JmsSpout.ack(JmsSpout.java:243) ~[stormjar.jar:3.7]
        at backtype.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:384) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$tuple_action_fn__4660.invoke(executor.clj:446) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
        ... 6 common frames omitted
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR
java.lang.RuntimeException: java.util.NoSuchElementException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

I am setting up my topology with a JmsSpout as it is mentioned in the example. I am connecting to an activeMQ via this spout. Everything works fine but sporadically I find the above stack trace in my logs and one of the spout instances stops dequeuing the messages from activeMQ. When I re-submit my topology, everything goes back to normal again.

JmsProvider jmsQueueProvider = new SpringJmsProvider(
            "file:" + options.activemqConfFile,
            Constants.JMS_CONNECTION_FACTORY,
            options.queueName
        );
        JmsTupleProducer producer = new JsonTupleProducer();
        // JMS Queue Spout
        JmsSpout queueSpout = new JmsSpout();
        queueSpout.setJmsProvider(jmsQueueProvider);
        queueSpout.setJmsTupleProducer(producer);
        queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        queueSpout.setDistributed(true); // allow multiple instances
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(Constants.JMS_QUEUE_SPOUT, queueSpout, 5);

What could be the problem? How do I debug this?

P.S.: I am not setting a recoveringPeriod and hence it is defaulted to -1(if this is of any relevance).

@theGreatHeisenberg
Copy link
Author

I am not able to figure out what is wrong with the above usage of Spout.
Upon further debugging, I noticed the fail and ack methods in JmsSpout.


public void fail(Object msgId) {
 LOG.warn("Message failed: " + msgId);
 this.pendingMessages.clear();
 this.toCommit.clear();
 synchronized(this.recoveryMutex) {
  this.hasFailures = true;
 }
}
public void ack(Object msgId) {

 Message msg = this.pendingMessages.remove(msgId);
 JmsMessageID oldest = this.toCommit.first();
 if (msgId.equals(oldest)) {
  if (msg != null) {
   try {
    LOG.debug("Committing...");
    msg.acknowledge();
    LOG.debug("JMS Message acked: " + msgId);
    this.toCommit.remove(msgId);
   } catch (JMSException e) {
    LOG.warn("Error acknowldging JMS message: " + msgId, e);
   }
  } else {
   LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
  }
 } else {
  this.toCommit.remove(msgId);
 }

}

In a fail method, even though we are calling a fail() for a particular messageId, the entire TreeSet toCommit is cleared. Eventually, if the next tuple is acked in the same bolt, this.toCommit.first() results in java.util.NoSuchElementException and the topology kicks in only after we kill and re-submit it.

Any help on this is really appreciated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant