diff --git a/src/main/java/io/nats/client/impl/PullMessageManager.java b/src/main/java/io/nats/client/impl/PullMessageManager.java index 6eb5e91d8..c7432dd88 100644 --- a/src/main/java/io/nats/client/impl/PullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullMessageManager.java @@ -161,7 +161,10 @@ protected ManageResult manageStatus(Message msg) { case CONFLICT_CODE: // sometimes just a warning String statMsg = status.getMessage(); - if (statMsg.startsWith("Exceeded Max")) { + if (statMsg.startsWith("Exceeded Max") + || statMsg.equals(SERVER_SHUTDOWN) + || statMsg.equals(LEADERSHIP_CHANGE) + ) { if (raiseStatusWarnings) { conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status)); } @@ -169,15 +172,15 @@ protected ManageResult manageStatus(Message msg) { } if (statMsg.equals(BATCH_COMPLETED) || - statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES) || - statMsg.equals(SERVER_SHUTDOWN)) + statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES)) { return STATUS_TERMINUS; } break; } - // all others are errors + // All unknown 409s are errors, since that basically means the client is not aware of them. + // These known ones are also errors: "Consumer Deleted" and "Consumer is push based" conn.executeCallback((c, el) -> el.pullStatusError(c, sub, status)); return STATUS_ERROR; } diff --git a/src/main/java/io/nats/client/support/Status.java b/src/main/java/io/nats/client/support/Status.java index 8deb6cc5f..ecdf0ab79 100644 --- a/src/main/java/io/nats/client/support/Status.java +++ b/src/main/java/io/nats/client/support/Status.java @@ -41,6 +41,7 @@ public class Status { public static String BATCH_COMPLETED = "Batch Completed"; // 409 informational public static String SERVER_SHUTDOWN = "Server Shutdown"; // 409 informational with headers + public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409 private final int code; private final String message; diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java index fec782157..54f56ef88 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java @@ -947,20 +947,21 @@ public void testConsumerDeletedSyncSub() throws Exception { }); } -// This just flaps. It's a timing thing? Already spent too much time, IWOMM and it should work as is. -// @Test -// public void testConsumerDeletedAsyncSub() throws Exception { -// testConflictStatus(409, CONSUMER_DELETED, TYPE_ERROR, "2.9.6", (nc, jsm, js, tsc, handler) -> { -// jsm.addOrUpdateConsumer(tsc.stream, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build()); -// Dispatcher d = nc.createDispatcher(); -// PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, durable(1)); -// JetStreamSubscription sub = js.subscribe(null, d, m -> {}, so); -// sub.pullExpiresIn(1, 30000); -// jsm.deleteConsumer(tsc.stream, durable(1)); -// js.publish(tsc.subject(), null); -// return sub; -// }); -// } + // This just flaps. It's a timing thing? Already spent too much time, IWOMM and it should work as is. + @Test + @Disabled + public void testConsumerDeletedAsyncSub() throws Exception { + testConflictStatus(409, CONSUMER_DELETED, TYPE_ERROR, "2.9.6", (nc, jsm, js, tsc, handler) -> { + jsm.addOrUpdateConsumer(tsc.stream, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build()); + Dispatcher d = nc.createDispatcher(); + PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, durable(1)); + JetStreamSubscription sub = js.subscribe(null, d, m -> {}, so); + sub.pullExpiresIn(1, 30000); + jsm.deleteConsumer(tsc.stream, durable(1)); + js.publish(tsc.subject(), null); + return sub; + }); + } static class BadPullRequestOptions extends PullRequestOptions { public BadPullRequestOptions() { diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index f92f448f5..930773cb3 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.time.Duration; import java.util.List; import java.util.concurrent.TimeUnit; @@ -94,6 +95,10 @@ public interface TwoServerTest { void test(Connection nc1, Connection nc2) throws Exception; } + public interface ThreeServerTest { + void test(Connection nc1, Connection nc2, Connection nc3) throws Exception; + } + public interface VersionCheck { boolean runTest(ServerInfo si); } @@ -319,6 +324,80 @@ public static void runInJsHubLeaf(TwoServerTest twoServerTest) throws Exception } } + public static void runInJsCluster(ThreeServerTest threeServerTest) throws Exception { + int port1 = NatsTestServer.nextPort(); + int port2 = NatsTestServer.nextPort(); + int port3 = NatsTestServer.nextPort(); + int listen1 = NatsTestServer.nextPort(); + int listen2 = NatsTestServer.nextPort(); + int listen3 = NatsTestServer.nextPort(); + String path1 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\"); + String path2 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\"); + String path3 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\"); + String cluster = variant(); + String serverPrefix = variant(); + + String[] server1Inserts = new String[] { + "jetstream {", + " store_dir=" + path1, + "}", + "server_name=" + serverPrefix + "1", + "cluster {", + " name: " + cluster, + " listen: 127.0.0.1:" + listen1, + " routes: [", + " nats-route://127.0.0.1:" + listen2, + " nats-route://127.0.0.1:" + listen3, + " ]", + "}", + }; + + String[] server2Inserts = new String[] { + "jetstream {", + " store_dir=" + path2, + "}", + "server_name=" + serverPrefix + "2", + "cluster {", + " name: " + cluster, + " listen: 127.0.0.1:" + listen2, + " routes: [", + " nats-route://127.0.0.1:" + listen1, + " nats-route://127.0.0.1:" + listen3, + " ]", + "}", + }; + + String[] server3Inserts = new String[] { + "jetstream {", + " store_dir=" + path3, + "}", + "server_name=" + serverPrefix + "3", + "cluster {", + " name: " + cluster, + " listen: 127.0.0.1:" + listen3, + " routes: [", + " nats-route://127.0.0.1:" + listen1, + " nats-route://127.0.0.1:" + listen2, + " ]", + "}", + }; + + try (NatsTestServer srv1 = new NatsTestServer(port1, false, true, null, server1Inserts, null); + Connection nc1 = standardConnection(srv1.getURI()); + NatsTestServer srv2 = new NatsTestServer(port2, false, true, null, server2Inserts, null); + Connection nc2 = standardConnection(srv2.getURI()); + NatsTestServer srv3 = new NatsTestServer(port3, false, true, null, server3Inserts, null); + Connection nc3 = standardConnection(srv3.getURI()) + ) { + try { + threeServerTest.test(nc1, nc2, nc3); + } + finally { + cleanupJs(nc1); + } + } + } + private static void cleanupJs(Connection c) { try {