Skip to content

Commit

Permalink
reconnect on connect validation and support (#1012)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 20, 2023
1 parent a95e2b9 commit 07d6b9d
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 30 deletions.
48 changes: 36 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -456,47 +456,71 @@ There are also examples in the [java-nats-examples](https://github.com/nats-io/j

### Connecting

There are four different ways to connect using the Java library:
There are five different ways to connect using the Java library,
each with a parallel method that will allow doing reconnect logic if the initial connect fails.
The ability to reconnect on the initial connection failure is _NOT_ an Options setting.

1. Connect to a local server on the default port:
1. Connect to a local server on the default url. From the Options class: `DEFAULT_URL = "nats://localhost:4222";`

```java
// default options
Connection nc = Nats.connect();

// default options, reconnect on connect
Connection nc = Nats.connectReconnectOnConnect();
```

2. Connect to one or more servers using a URL:
1. Connect to one or more servers using a URL:

```java
//single URL
// single URL, all other default options
Connection nc = Nats.connect("nats://myhost:4222");

//comma-separated list of URLs
// comma-separated list of URLs, all other default options
Connection nc = Nats.connect("nats://myhost:4222,nats://myhost:4223");

// single URL, all other default options, reconnect on connect
Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222");

// comma-separated list of URLs, all other default options, reconnect on connect
Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222,nats://myhost:4223");
```

3. Connect to one or more servers with a custom configuration:
1. Connect to one or more servers with a custom configuration:

```java
Options o = new Options.Builder().server("nats://serverone:4222").server("nats://servertwo:4222").maxReconnects(-1).build();

// custom options
Connection nc = Nats.connect(o);
```

See the javadoc for a complete list of configuration options.
// custom options, reconnect on connect
Connection nc = Nats.connectReconnectOnConnect(o);
```

4. Connect asynchronously, this requires a callback to tell the application when the client is connected:
1. Connect asynchronously, this requires a callback to tell the application when the client is connected:

```java
Options options = new Options.Builder().server(Options.DEFAULT_URL).connectionListener(handler).build();
Nats.connectAsynchronously(options, true);
```

This feature is experimental, please let us know if you like it.

5. Connect with authentication handler:
1. Connect with authentication handler:

```java
AuthHandler authHandler = Nats.credentials(System.getenv("NATS_CREDS"));

// single URL, all other default options
Connection nc = Nats.connect("nats://myhost:4222", authHandler);

// comma-separated list of URLs, all other default options
Connection nc = Nats.connect("nats://myhost:4222,nats://myhost:4223", authHandler);

// single URL, all other default options, reconnect on connect
Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222", authHandler);

// comma-separated list of URLs, all other default options, reconnect on connect
Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222,nats://myhost:4223", authHandler);
```

### Publishing
Expand Down
11 changes: 11 additions & 0 deletions dependencies.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

This file lists the dependencies used in this repository.

#### Tool Dependencies

| Dependency | License |
|------------------|--------------------------------------------|
| Java JDK 8 Adopt | GNU General Public License (GPL) Version 2 |
| Gradle 6.8.3 | Apache 2.0 License |

#### Runtime Dependencies

| Dependency | License |
Expand Down Expand Up @@ -51,3 +58,7 @@ This file lists the dependencies used in this repository.
[BSD Licenses](https://en.wikipedia.org/wiki/BSD_licenses)

[MIT License](https://en.wikipedia.org/wiki/MIT_License)

[GNU General Public License, version 2, with the Classpath Exception](https://openjdk.org/legal/gplv2+ce.html)

[Adoption Adopt JDK](https://adoptium.net/about/)
87 changes: 69 additions & 18 deletions src/main/java/io/nats/client/Nats.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public abstract class Nats {
}

/**
* Connect to the default URL, {@link Options#DEFAULT_URL Options.DEFAULT_URL}, with all of the
* Connect to the default URL, {@link Options#DEFAULT_URL Options.DEFAULT_URL}, with all the
* default options.
*
* <p>This is a synchronous call, and the connection should be ready for use on return
Expand All @@ -114,38 +114,51 @@ public abstract class Nats {
* <p>If the connection fails, an IOException is thrown
*
* <p>See {@link Nats#connect(Options) connect(Options)} for more information on exceptions.
*
*
* @return the connection
* @throws IOException if a networking issue occurs
* @throws InterruptedException if the current thread is interrupted
* @return the connection
*/
public static Connection connect() throws IOException, InterruptedException {
Options options = new Options.Builder().server(Options.DEFAULT_URL).build();
return createConnection(options, false);
}

/**
* Connect to the default URL, {@link Options#DEFAULT_URL Options.DEFAULT_URL}, with all the
* default options, allowing re-connect attempts if the initial connection fails
* @return the connection
* @throws IOException if an unrecoverable networking issue occurs
* @throws InterruptedException if the current thread is interrupted
*/
public static Connection connectReconnectOnConnect() throws IOException, InterruptedException {
Options options = new Options.Builder().server(Options.DEFAULT_URL).build();
return createConnection(options, true);
}

/**
* Connect to specific url, with all the default options.
* The Java client generally expects URLs of the form {@code nats://hostname:port}
*
* <p>but also allows urls with a user password {@code nats://user:pass@hostname:port}.
*
* <p>or token in them {@code nats://token@hostname:port}.
*
*
* <p>but also allows urls with a user password {@code nats://user:pass@hostname:port}.</p>
*
* <p>or token in them {@code nats://token@hostname:port}.</p>
*
* <p>Moreover, you can initiate a TLS connection, by using the `tls`
* schema, which will use the default SSLContext, or fail if one is not set. For
* testing and development, the `opentls` schema is support when the server is
* in non-verify mode. In this case, the client will accept any server
* certificate and will not provide one of its own.
*
* certificate and will not provide one of its own.</p>
*
* <p>This is a synchronous call, and the connection should be ready for use on return
* there are network timing issues that could result in a successful connect call but
* the connection is invalid soon after return, where soon is in the network/thread world.
*
* <p>If the connection fails, an IOException is thrown
* the connection is invalid soon after return, where soon is in the network/thread world.</p>
*
* <p>See {@link Nats#connect(Options) connect(Options)} for more information on exceptions.
* <p>If the connection fails, an IOException is thrown</p>
*
* @param url the url of the server, ie. nats://localhost:4222
* <p>See {@link Nats#connect(Options) connect(Options)} for more information on exceptions.</p>
*
* @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223
* @throws IOException if a networking issue occurs
* @throws InterruptedException if the current thread is interrupted
* @return the connection
Expand All @@ -156,7 +169,20 @@ public static Connection connect(String url) throws IOException, InterruptedExce
}

/**
* Connect to the specified URL with the specified username, password and default options.
* Connect to specific url, with all the default options,
* allowing re-connect attempts if the initial connection fails
* @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223
* @return the connection
* @throws IOException if an unrecoverable networking issue occurs
* @throws InterruptedException if the current thread is interrupted
*/
public static Connection connectReconnectOnConnect(String url) throws IOException, InterruptedException {
Options options = new Options.Builder().server(url).build();
return createConnection(options, true);
}

/**
* Connect to the specified URL with the specified auth handler.
*
* <p>This is a synchronous call, and the connection should be ready for use on return
* there are network timing issues that could result in a successful connect call but
Expand All @@ -166,7 +192,7 @@ public static Connection connect(String url) throws IOException, InterruptedExce
*
* <p>See {@link Nats#connect(Options) connect(Options)} for more information on exceptions.
*
* @param url comma separated list of the URLs of the server, ie. nats://localhost:4222,nats://localhost:4223
* @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223
* @param handler the authentication handler implementation
* @return the connection
* @throws IOException if a networking issue occurs
Expand All @@ -177,6 +203,20 @@ public static Connection connect(String url, AuthHandler handler) throws IOExcep
return createConnection(options, false);
}

/**
* Connect to the specified URL with the specified auth handler,
* allowing re-connect attempts if the initial connection fails
* @param url comma separated list of the URLs of the server, i.e. nats://localhost:4222,nats://localhost:4223
* @param handler the authentication handler implementation
* @return the connection
* @throws IOException if an unrecoverable networking issue occurs
* @throws InterruptedException if the current thread is interrupted
*/
public static Connection connectReconnectOnConnect(String url, AuthHandler handler) throws IOException, InterruptedException {
Options options = new Options.Builder().server(url).authHandler(handler).build();
return createConnection(options, true);
}

/**
* Options can be used to set the server URL, or multiple URLS, callback
* handlers for various errors, and connection events.
Expand All @@ -202,14 +242,25 @@ public static Connection connect(String url, AuthHandler handler) throws IOExcep
* again in the future.
*
* @param options the options object to use to create the connection
* @return the connection
* @throws IOException if a networking issue occurs
* @throws InterruptedException if the current thread is interrupted
* @return the connection
*/
public static Connection connect(Options options) throws IOException, InterruptedException {
return createConnection(options, false);
}

/**
* Connect, allowing re-connect attempts if the initial connection fails
* @param options the options object to use to create the connection
* @return the connection
* @throws IOException if an unrecoverable networking issue occurs
* @throws InterruptedException if the current thread is interrupted
*/
public static Connection connectReconnectOnConnect(Options options) throws IOException, InterruptedException {
return createConnection(options, true);
}

/**
* Try to connect in another thread, a connection listener is required to get
* the connection.
Expand Down
73 changes: 73 additions & 0 deletions src/test/java/io/nats/client/ConnectTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.nats.client.ConnectionListener.Events;
import io.nats.client.NatsServerProtocolMock.ExitAt;
import io.nats.client.impl.SimulateSocketDataPortException;
import io.nats.client.impl.TestHandler;
import org.junit.jupiter.api.Test;

Expand All @@ -24,6 +25,7 @@
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.nats.client.utils.TestBase.*;
import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -428,4 +430,75 @@ public void run() {
standardCloseConnection(nc);
}
}

@SuppressWarnings({"unused", "UnusedAssignment", "resource"})
@Test
public void testSocketLevelException() throws Exception {
int port = NatsTestServer.nextPort();

AtomicBoolean simExReceived = new AtomicBoolean();
TestHandler th = new TestHandler();
ErrorListener el = new ErrorListener() {
@Override
public void exceptionOccurred(Connection conn, Exception exp) {
if (exp.getMessage().contains("Simulated Exception")) {
simExReceived.set(true);
}
}
};

Options options = new Options.Builder()
.server(NatsTestServer.getNatsLocalhostUri(port))
.dataPortType("io.nats.client.impl.SimulateSocketDataPortException")
.connectionListener(th)
.errorListener(el)
.reconnectDelayHandler(l -> Duration.ofSeconds(1))
.build();

Connection connection = null;

// 1. DO NOT RECONNECT ON CONNECT
try (NatsTestServer ts = new NatsTestServer(port, false)) {
try {
SimulateSocketDataPortException.THROW_ON_CONNECT.set(true);
connection = Nats.connect(options);
fail();
}
catch (Exception ignore) {}
}

Thread.sleep(200); // just making sure messages get through
assertNull(connection);
assertTrue(simExReceived.get());
simExReceived.set(false);

// 2. RECONNECT ON CONNECT
try (NatsTestServer ts = new NatsTestServer(port, false)) {
try {
SimulateSocketDataPortException.THROW_ON_CONNECT.set(true);
th.prepForStatusChange(Events.RECONNECTED);
connection = Nats.connectReconnectOnConnect(options);
assertTrue(th.waitForStatusChange(5, TimeUnit.SECONDS));
th.prepForStatusChange(Events.DISCONNECTED);
}
catch (Exception e) {
fail("should have connected " + e);
}
}
assertTrue(th.waitForStatusChange(5, TimeUnit.SECONDS));
assertTrue(simExReceived.get());
simExReceived.set(false);

// 2. NORMAL RECONNECT
th.prepForStatusChange(Events.RECONNECTED);
try (NatsTestServer ts = new NatsTestServer(port, false)) {
SimulateSocketDataPortException.THROW_ON_CONNECT.set(true);
try {
assertTrue(th.waitForStatusChange(5, TimeUnit.SECONDS));
}
catch (Exception e) {
fail("should have reconnected " + e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.nats.client.impl;

import io.nats.client.support.NatsUri;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimulateSocketDataPortException extends SocketDataPort {
public static AtomicBoolean THROW_ON_CONNECT = new AtomicBoolean();

@Override
public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws IOException {
if (THROW_ON_CONNECT.get()) {
SimulateSocketDataPortException.THROW_ON_CONNECT.set(false);
throw new ConnectException("Simulated Exception");
}
super.connect(conn, nuri, timeoutNanos);
}
}

0 comments on commit 07d6b9d

Please sign in to comment.