-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
java.lang.IllegalMonitorStateException if thread is interrupted during NatsConnection.publish() #1250
Comments
This same exception also occurs if you're on a pull subscription and in anything that's blocking such as fetch(). That exception is pretty scary/confusing as it's sourced from trying to unlock a lock that isn't owned by the thread. I haven't dug much into what's going on, but I fear a lock isn't actually being unlocked even though the fetch() returns since its interrupted. java.lang.IllegalMonitorStateException: null
at java.base/java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:175)
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1059)
at java.base/java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:494)
at io.nats.client.impl.MessageQueue.push(MessageQueue.java:187)
at io.nats.client.impl.MessageQueue.push(MessageQueue.java:139)
at io.nats.client.impl.NatsConnectionWriter.queue(NatsConnectionWriter.java:233)
at io.nats.client.impl.NatsConnection.queueOutgoing(NatsConnection.java:1682)
at io.nats.client.impl.NatsConnection.publishInternal(NatsConnection.java:992)
at io.nats.client.impl.NatsJetStreamPullSubscription._pull(NatsJetStreamPullSubscription.java:65)
at io.nats.client.impl.NatsJetStreamPullSubscription._fetch(NatsJetStreamPullSubscription.java:144)
at io.nats.client.impl.NatsJetStreamPullSubscription.fetch(NatsJetStreamPullSubscription.java:128)
at com.fizzed.nats.demo.NatsStreamConsumer.lambda$execute$0(NatsStreamConsumer.java:44)
at java.base/java.lang.Thread.run(Thread.java:1583) |
Are you using any unusual futures or threading model? What version of Java are you running on? Any idea what is causing the interrupt? I'll have to go through all the locks, but they should all be done in finally blocks. |
Nothing fancy, just a simple example as I was exploring nats.java yesterday. Here is the code that causes it. The "subscriberThread.interrupt()" triggers that exception if the subscriber thread is in the fetch() method. |
My goal is to have the client behave as well as possible. I'm looking into this to see if there is anything I can add to the code to make it not freeze. I will also go through the code to make sure my locks are all unlocked in finally, I'm 99.9% certain they are all. I'm by no means the threading expert but I recently went through all of our code to move completely to ReentrantLock. I did a lot of reading and what I learned is that you have to build your loops to be able to yield and stop. Interrupting just throws a wrench in the works and the result is pretty much undefined. I'm looking at your code example. There is no reason for you to interrupt the subscriber thread. If you absolutely must shut it down, you should have a flag (AtomicBoolean probably) in your while loop and set that, stop fetching messages and exit the loop gracefully. In any case, I don't even think you need to do this. The fetch will recover and re-subscribe once the connection is re-established. I also think you should have your exception handling for any JetStream calls (like fetch) outside of the while loop, because they are pretty much terminal errors. |
@scottf I've been working in Java for over 20 years and so I'll relay some of my thread/atomic knowledge. Interrupting a thread is exactly how you signal you want blocking calls to stop blocking. The fetch() call blocks on I/O, interrupting that thread is how you'd signal you want the thread to shutdown. In a production app, I'd have an AtomicBoolean that signals I want the thread to stop, I'd then interrupt the thread, then some try/catch in the thread's run() would check that stop was signaled, and exit the run(). What you've described is almost more like a "spin lock" where it polls for X seconds, then decides its being shutdown. That's not a great design when you're trying to handle things like an app gracefully shutting down. Also, this is how the Java ExecutorService shutdownNow() method works -- the javadocs explains that the executor service will do an interrupt on the threads to signal them to be shutdown: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ExecutorService.html#shutdownNow() Now, you'll come back with something like, only wait X seconds on the fetch(), put that in a loop, check the stop signal or something. That'll kind of work, but that wouldn't guarantee an interrupt won't be called, won't be triggered in the fetch(), especially as I pointed out that's exactly how the ExecutorService in Java triggers a shutdown. Also, its good practice to now "swallow" interrupts if your own library code is in turn also blocking on something. That's why Java's lock APIs all have InterruptedException's get thrown, since that's another area that can block and interrupts should be handled properly and propagated up the chain if necessary. You can handle it in your own try/catch, but then its good to re-throw it so the consumer's of your library can also appropriately handle the interrupt -- since their own app or themselves are the ones that triggered it. |
One last thing, I was testing scenarios of the nats server connection being disrupted (ethernet, being shutdown, etc.) while an app was running. If you call the fetch() method, and while its blocking the underlying connection gets re-connected, then fetch() never returns any new messages. You MUST call fetch() again in that situation to get new messages. So despite the fancy re-connect handling under-the-hood, it doesn't actually help my app if the fetch() never returns any new messages. The documentation isn't clear that would be the case and I think the fact "re-connect" is a feature, I expected fetch() to just work once the connection was re-established. The other problem is you don't know a re-connect occurred while your code is calling fetch(). So I added code to add a connection listener, get the connection closed event. Then I needed to tell the subscriber thread that something happened, so I used the interrupt technique. That's how I discovered what I think could be a problem -- it hints that maybe a lock isn't actually unlocked, but I haven't dug too much into it. |
You are supposed to handle the InterruptedException, clean up your state and exit your thread gracefully. Your example code does not do that. The fetch call in your example already has a time out. An exception thrown via any JetStream call is probably unrecoverable and you might need to make a new subscription. A disconnection is a recoverable situation that can be accounted for. I suggest actually moving to the simplified API which recovers on disconnects. The fetch in your example can too, I have an example somewhere, but there is more to add to your code to handle it. As I said, I'm not suggesting, by any means that it's "not my problem". I'm looking into it. Just a quick glance at that push() function with fresh eyes, I can see something I should dig into. And I'm trying to help here. Please keep your comments respectful. I've been working with Java since 1995 so I know a little bit too. The software is free and open source, so if you have a solution that I don't see, feel free to make a PR. |
I'm not trying to be difficult, just trying to give nats a try to see if we'll use it as a key part of our platform. You mentioned some newness on threading, so I shared my background. Did not mean that as a dig :-) The fetch() call is currently throwing a java.lang.IllegalMonitorStateException if the thread executing it is interrupted (see my stacktrace above). If it was throwing an InterruptedException, that's actually what I'd expect it to be doing and I'd be happy. The IllegalMonitorStateException is a scary exception since it means somewhere in the stacktrace I shared, a lock is trying to be released, but its not actually owned by the thread that locked it. |
@jjlauer I built a snapshot of my PR branch 2.20.5.edit-lock-handling-SNAPSHOT. It's only on Sonatype, the repo readme has gradle and maven dependency examples. If it's easier we can move this conversation to slack, I'm scottf on there. I really want to solve this problem. I'm in US East so will be signing off tonight, but back at it tommorow. |
@scottf I tried out that branch/PR -- I cloned this repo, checked out that branch, published locally. The IllegalMonitorStateException no longer gets thrown, but now fetch() immediately returns with no messages once it's been interrupted. So fetch() does technically "swallow" the InterruptedException. Also, the problem is if fetch() is called again (w/ the server's connection still being down), the fetch() call immediately returns again (w/o honoring the Duration I passed in). In my example project, this lead to a full blast never ending loop that was difficult to kill the java process to get out of it. Now if I add a Thread.sleep() at the bottom of my loop, the fetch() call definitely will quickly return no messages while the server connection is down. Once the server connection comes back up, the new fetch() call did work to get messages. I guess this leads to the question -- if the underlying connection is down, what behavior should fetch() have? or other methods like pull(), etc? IMHO, they either should throw an exception telling you the connection is down or they should honor the Duration I passed in and if the server comes back up, they return a message. |
The subscription is invalid once the connection is broken and fetch will never work. You need to resubscribe after the interrupt. |
There is an example recoverable consume here: https://gist.github.com/scottf/5e22d3167452cfe54da692a2c862fa92 |
Observed behavior
My application uses thread interruption to signal that work should be aborted. When a thread is interrupted before making a call to anything that tries to publish a message to a NATS Connection, the call to publish throws an IllegalMonitorStateException.
Here's an example exception stack from a call to
KeyValue.create()
.Expected behavior
Calls to NATS should either:
Server and client version
Nats Server: 2.10.20
Nats Java client: 2.20.2
Host environment
Amazon Linux 2 within docker on c5a.2xlarge EC2 instance. Container limited to 2GiB RAM and 3vCPU.
Steps to reproduce
The text was updated successfully, but these errors were encountered: