-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revisit Write Timeout Handling #1128
Conversation
@@ -279,7 +281,7 @@ public void run() { | |||
nc.publish(subject, payload); | |||
success = true; | |||
} catch (IllegalStateException ex) { | |||
if (ex.getMessage().contains("Output queue is full")) { | |||
if (ex.getMessage().contains(OUTPUT_QUEUE_IS_FULL)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Houskeeping by making this text a constant since it's used in multiple places
* @throws IOException | ||
* @throws InterruptedException | ||
* @throws IOException the forceReconnect fails | ||
* @throws InterruptedException the connection is not connected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I was dealing with forceReconnect, I had noticed this doc was missing.
@@ -146,7 +146,7 @@ default void socketWriteTimeout(Connection conn) {} | |||
* @param sub the JetStreamSubscription that this occurred on, if applicable | |||
* @param pairs custom string pairs. I.E. "foo: ", fooObject, "bar-", barObject will be appended | |||
* to the message like ", foo: <fooValue>, bar-<barValue>". | |||
* @return | |||
* @return the message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doc too.
connection.forceReconnect(); | ||
} | ||
catch (IOException | InterruptedException ignore) {} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to run this on another thread since this whole data port implementation object will be closed during the forceReconnect
catch (Exception ignore) { | ||
// don't want this to be passed along | ||
} | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just don't want to fail on anything
if (printExceptions) { | ||
e.printStackTrace(); | ||
} | ||
maybePrintException("waitForBooleanFuture", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the testing harness, nothing related to runtime
@@ -744,6 +745,8 @@ public void testSocketDataPortTimeout() throws Exception { | |||
ListenerForTesting listener = new ListenerForTesting(); | |||
Options.Builder builder = Options.builder() | |||
.socketWriteTimeout(5000) | |||
.pingInterval(Duration.ofSeconds(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of the thing we wanted to test is that pings don't cause a freeze
@@ -744,6 +745,8 @@ public void testSocketDataPortTimeout() throws Exception { | |||
ListenerForTesting listener = new ListenerForTesting(); | |||
Options.Builder builder = Options.builder() | |||
.socketWriteTimeout(5000) | |||
.pingInterval(Duration.ofSeconds(1)) | |||
.maxMessagesInOutgoingQueue(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just allows getting to max messages faster than the default of 5000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some minor comments.
processException(exp); | ||
throw exp; | ||
} catch (Exception exp) { // every thing else | ||
} catch (Exception exp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combined because they need to do the same thing and Runtime extends Exception
# Conflicts: # src/main/java/io/nats/client/Connection.java # src/test/java/io/nats/client/OptionsTests.java
1. Tuned implementation of forceReconnect.
2. Change the calling of the forceReconnect from the write timeout watch to use an executor thread instead of calling directly to so it doesn't run on the thread that will end up closing the watch's parent dataPort
3. Guards around connecting, meaning using a simple atomic flag, don't go into connect logic if already in connect logic. Probably not that necessary, but cost impact is minimal and will help any time multiple threads detect the same failure.
4. Smarter lock acquisition inside pushing a message to the outgoing queue, to avoid many threads needed losing parallelization because they had to wait for the lock.
5. Socket Write Timeout must be 100ms greater than the connection timeout. This may have been contributing to the cycle of failing to connect, but either way... The logic being that if the developer thinks it could take 10 seconds to connect and have set their connection timeout to account for that then it does not make sense to have a shorter write timeout. This is mostly applicable during the connection behavior and probably not as much of an issue when doing things when already connected, but I can't really know which situation we are in. Considering the connection timeout is on the order of seconds and the default socket write timeout is 2 minutes, ensuring the socket write timeout is at least 100ms more than the connection timeout seems like a reasonable thing to do.
6. The ping logic that runs on a timer in a separate thread could throw an exception. This is minor and will already be raised to the error listener, so I handled it instead of just letting it pass through to the timer thread, which is just noise.