diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 5a3f7725d..c4c2dcc97 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -227,6 +227,11 @@ public class Options { * {@link Builder#errorListener(ErrorListener) errorListener}. */ public static final String PROP_ERROR_LISTENER = PFX + "callback.error"; + /** + * Property used to configure a builder from a Properties object. {@value}, see + * {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector}. + */ + public static final String PROP_STATISTICS_COLLECTOR = PFX + "statisticscollector"; /** * Property used to configure a builder from a Properties object. {@value}, see {@link Builder#maxPingsOut(int) maxPingsOut}. */ @@ -556,6 +561,7 @@ public class Options { private final ErrorListener errorListener; private final ConnectionListener connectionListener; + private final StatisticsCollector statisticsCollector; private final String dataPortType; private final boolean trackAdvancedStats; @@ -661,6 +667,7 @@ public static class Builder { private ErrorListener errorListener = null; private ConnectionListener connectionListener = null; + private StatisticsCollector statisticsCollector = null; private String dataPortType = DEFAULT_DATA_PORT_TYPE; private ExecutorService executor; private List> httpRequestInterceptors; @@ -762,6 +769,7 @@ public Builder properties(Properties props) { classnameProperty(props, PROP_ERROR_LISTENER, o -> this.errorListener = (ErrorListener) o); classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o); + classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o); stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s); stringProperty(props, PROP_INBOX_PREFIX, this::inboxPrefix); @@ -1327,6 +1335,19 @@ public Builder connectionListener(ConnectionListener listener) { return this; } + /** + * Set the {@link StatisticsCollector StatisticsCollector} to collect connection metrics. + *

+ * If not set, then a default implementation will be used. + * + * @param collector the new StatisticsCollector for this connection. + * @return the Builder for chaining + */ + public Builder statisticsCollector(StatisticsCollector collector) { + this.statisticsCollector = collector; + return this; + } + /** * Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a * cached thread pool that names threads after the connection name (or a default). This executor @@ -1532,6 +1553,7 @@ else if (useDefaultTls) { new SynchronousQueue<>(), new DefaultThreadFactory(threadPrefix)); } + return new Options(this); } @@ -1580,6 +1602,7 @@ public Builder(Options o) { this.errorListener = o.errorListener; this.connectionListener = o.connectionListener; + this.statisticsCollector = o.statisticsCollector; this.dataPortType = o.dataPortType; this.trackAdvancedStats = o.trackAdvancedStats; this.executor = o.executor; @@ -1638,6 +1661,7 @@ private Options(Builder b) { this.errorListener = b.errorListener == null ? new ErrorListenerLoggerImpl() : b.errorListener; this.connectionListener = b.connectionListener; + this.statisticsCollector = b.statisticsCollector; this.dataPortType = b.dataPortType; this.trackAdvancedStats = b.trackAdvancedStats; this.executor = b.executor; @@ -1689,6 +1713,13 @@ public ConnectionListener getConnectionListener() { return this.connectionListener; } + /** + * @return the statistics collector, or null, see {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector()} in the builder doc + */ + public StatisticsCollector getStatisticsCollector() { + return this.statisticsCollector; + } + /** * @return the auth handler, or null, see {@link Builder#authHandler(AuthHandler) authHandler()} in the builder doc */ diff --git a/src/main/java/io/nats/client/Statistics.java b/src/main/java/io/nats/client/Statistics.java index fb518f971..73957e9cc 100644 --- a/src/main/java/io/nats/client/Statistics.java +++ b/src/main/java/io/nats/client/Statistics.java @@ -20,33 +20,88 @@ *

The Statistics toString() provides a summary of the statistics. */ public interface Statistics { + + /** + * @return the total number of pings that have been sent from this connection. + */ + long getPings(); + + /** + * @return the total number of times this connection has tried to reconnect. + */ + long getReconnects(); + + /** + * @return the total number of messages dropped by this connection across all slow consumers. + */ + long getDroppedCount(); + + /** + * @return the total number of op +OKs received by this connection. + */ + long getOKs(); + + /** + * @return the total number of op -ERRs received by this connection. + */ + long getErrs(); + + /** + * @return the total number of exceptions seen by this connection. + */ + long getExceptions(); + + /** + * @return the total number of requests sent by this connection. + */ + long getRequestsSent(); + + /** + * @return the total number of replies received by this connection. + */ + long getRepliesReceived(); + + /** + * @return the total number of duplicate replies received by this connection. + * + * NOTE: This is only counted if advanced stats are enabled. + */ + long getDuplicateRepliesReceived(); + + /** + * @return the total number of orphan replies received by this connection. + * + * NOTE: This is only counted if advanced stats are enabled. + */ + long getOrphanRepliesReceived(); + /** * @return the total number of messages that have come in to this connection. */ - public long getInMsgs(); + long getInMsgs(); /** * @return the total number of messages that have gone out of this connection. */ - public long getOutMsgs(); + long getOutMsgs(); /** * @return the total number of message bytes that have come in to this connection. */ - public long getInBytes(); + long getInBytes(); /** - * @return the total number of message bytes that have gone out of to this connection. + * @return the total number of message bytes that have gone out of this connection. */ - public long getOutBytes(); + long getOutBytes(); /** - * @return the total number of times this connection has tried to reconnect. + * @return the total number of outgoing message flushes by this connection. */ - public long getReconnects(); + long getFlushCounter(); /** - * @return the total number of messages dropped by this connection across all slow consumers. + * @return the count of outstanding of requests from this connection. */ - public long getDroppedCount(); + long getOutstandingRequests(); } diff --git a/src/main/java/io/nats/client/StatisticsCollector.java b/src/main/java/io/nats/client/StatisticsCollector.java new file mode 100644 index 000000000..b7de6514d --- /dev/null +++ b/src/main/java/io/nats/client/StatisticsCollector.java @@ -0,0 +1,131 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +/** + * A collector for connection metrics. + *

+ * Information about key metrics is incremented on this collector by the connection. + *

+ * See {@link Statistics} for accessing the collected metrics. + */ +public interface StatisticsCollector extends Statistics { + /** + * Sets whether advanced stats are/should be tracked. + */ + void setAdvancedTracking(boolean trackAdvanced); + + /** + * Increments the total number of pings that have been sent from this connection. + */ + void incrementPingCount(); + + /** + * Increments the total number of times this connection has tried to reconnect. + */ + void incrementReconnects(); + + /** + * Increments the total number of messages dropped by this connection across all slow consumers. + */ + void incrementDroppedCount(); + + /** + * Increments the total number of op +OKs received by this connection. + */ + void incrementOkCount(); + + /** + * Increments the total number of op -ERRs received by this connection. + */ + void incrementErrCount(); + + /** + * Increments the total number of exceptions seen by this connection. + */ + void incrementExceptionCount(); + + /** + * Increments the total number of requests sent by this connection. + */ + void incrementRequestsSent(); + + /** + * Increments the total number of replies received by this connection. + */ + void incrementRepliesReceived(); + + /** + * Increments the total number of duplicate replies received by this connection. + *

+ * NOTE: This is only counted if advanced stats are enabled. + */ + void incrementDuplicateRepliesReceived(); + + /** + * Increments the total number of orphan replies received by this connection. + *

+ * NOTE: This is only counted if advanced stats are enabled. + */ + void incrementOrphanRepliesReceived(); + + /** + * Increments the total number of messages that have come in to this connection. + */ + void incrementInMsgs(); + + /** + * Increments the total number of messages that have gone out of this connection. + */ + void incrementOutMsgs(); + + /** + * Increment the total number of message bytes that have come in to this connection. + */ + void incrementInBytes(long bytes); + + /** + * Increment the total number of message bytes that have gone out of this connection. + */ + void incrementOutBytes(long bytes); + + /** + * Increment the total number of outgoing message flushes by this connection. + */ + void incrementFlushCounter(); + + /** + * Increments the count of outstanding of requests from this connection. + */ + void incrementOutstandingRequests(); + + /** + * Decrements the count of outstanding of requests from this connection. + */ + void decrementOutstandingRequests(); + + /** + * Registers a Socket read by this connection. + *

+ * NOTE: Implementations should only count this if advanced stats are enabled. + */ + void registerRead(long bytes); + + /** + * Registers a Socket write by this connection. + *

+ * NOTE: Implementations should only count this if advanced stats are enabled. + */ + void registerWrite(long bytes); +} diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index dd2a34df2..8bb4a9599 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -50,7 +50,7 @@ class NatsConnection implements Connection { private final Options options; - private final NatsStatistics statistics; + private final StatisticsCollector statistics; private boolean connecting; // you can only connect in one thread private boolean disconnecting; // you can only disconnect in one thread @@ -111,7 +111,8 @@ class NatsConnection implements Connection { this.options = options; advancedTracking = options.isTrackAdvancedStats(); - this.statistics = new NatsStatistics(advancedTracking); + this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector(); + this.statistics.setAdvancedTracking(advancedTracking); this.statusLock = new ReentrantLock(); this.statusChanged = this.statusLock.newCondition(); @@ -1231,11 +1232,12 @@ void deliverReply(Message msg) { statistics.incrementRepliesReceived(); } else if (!oldStyle && !subject.startsWith(mainInbox)) { - if (advancedTracking && responsesRespondedTo.get(key) != null) { - statistics.incrementDuplicateRepliesReceived(); - } - else { - statistics.incrementOrphanRepliesReceived(); + if (advancedTracking) { + if (responsesRespondedTo.get(key) != null) { + statistics.incrementDuplicateRepliesReceived(); + } else { + statistics.incrementOrphanRepliesReceived(); + } } } } @@ -1719,7 +1721,7 @@ public Statistics getStatistics() { return this.statistics; } - NatsStatistics getNatsStatistics() { + StatisticsCollector getNatsStatistics() { return this.statistics; } diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index d1207d740..2e8e21a54 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -14,6 +14,7 @@ package io.nats.client.impl; import io.nats.client.Options; +import io.nats.client.StatisticsCollector; import java.io.IOException; import java.nio.BufferOverflowException; @@ -108,7 +109,7 @@ Future stop() { return this.stopped; } - synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, NatsStatistics stats) + synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats) throws IOException { int sendPosition = 0; @@ -167,7 +168,7 @@ public void run() { try { dataPort = this.dataPortFuture.get(); // Will wait for the future to complete - NatsStatistics stats = this.connection.getNatsStatistics(); + StatisticsCollector stats = this.connection.getNatsStatistics(); int maxAccumulate = Options.MAX_MESSAGES_IN_NETWORK_BUFFER; while (this.running.get()) { diff --git a/src/main/java/io/nats/client/impl/NatsImpl.java b/src/main/java/io/nats/client/impl/NatsImpl.java index 635300d43..ddb701725 100644 --- a/src/main/java/io/nats/client/impl/NatsImpl.java +++ b/src/main/java/io/nats/client/impl/NatsImpl.java @@ -31,7 +31,7 @@ public static Connection createConnection(Options options, boolean reconnectOnCo } public static Statistics createEmptyStats() { - return new NatsStatistics(false); + return new NatsStatistics(); } /** @@ -72,4 +72,4 @@ public static AuthHandler staticCredentials(char[] jwt, char[] nkey) { return new StringAuthHandler(jwt, nkey); } -} \ No newline at end of file +} diff --git a/src/main/java/io/nats/client/impl/NatsStatistics.java b/src/main/java/io/nats/client/impl/NatsStatistics.java index e99243f60..485788f89 100644 --- a/src/main/java/io/nats/client/impl/NatsStatistics.java +++ b/src/main/java/io/nats/client/impl/NatsStatistics.java @@ -14,14 +14,17 @@ package io.nats.client.impl; import io.nats.client.Statistics; +import io.nats.client.StatisticsCollector; import java.text.NumberFormat; import java.util.LongSummaryStatistics; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -class NatsStatistics implements Statistics { - private ReentrantLock lock; +class NatsStatistics implements Statistics, StatisticsCollector { + private final ReentrantLock readStatsLock; + private final ReentrantLock writeStatsLock; + private LongSummaryStatistics readStats; private LongSummaryStatistics writeStats; @@ -42,14 +45,15 @@ class NatsStatistics implements Statistics { private AtomicLong exceptionCount; private AtomicLong droppedCount; - final private boolean trackAdvanced; + private boolean trackAdvanced; + + public NatsStatistics() { + this.readStatsLock = new ReentrantLock(); + this.writeStatsLock = new ReentrantLock(); - public NatsStatistics(boolean trackAdvanced) { - this.trackAdvanced = trackAdvanced; this.readStats = new LongSummaryStatistics(); this.writeStats = new LongSummaryStatistics(); - this.lock = new ReentrantLock(); this.flushCounter = new AtomicLong(); this.outstandingRequests = new AtomicLong(); this.requestsSent = new AtomicLong(); @@ -68,153 +72,198 @@ public NatsStatistics(boolean trackAdvanced) { this.droppedCount = new AtomicLong(); } - void incrementPingCount() { + @Override + public void setAdvancedTracking(boolean trackAdvanced) { + this.trackAdvanced = trackAdvanced; + } + + @Override + public void incrementPingCount() { this.pingCount.incrementAndGet(); } - void incrementDroppedCount() { + @Override + public void incrementDroppedCount() { this.droppedCount.incrementAndGet(); } - void incrementOkCount() { + @Override + public void incrementOkCount() { this.okCount.incrementAndGet(); } - void incrementErrCount() { + @Override + public void incrementErrCount() { this.errCount.incrementAndGet(); } - void incrementExceptionCount() { + @Override + public void incrementExceptionCount() { this.exceptionCount.incrementAndGet(); } - void incrementRequestsSent() { + @Override + public void incrementRequestsSent() { this.requestsSent.incrementAndGet(); } - void incrementRepliesReceived() { + @Override + public void incrementRepliesReceived() { this.repliesReceived.incrementAndGet(); } - void incrementDuplicateRepliesReceived() { + @Override + public void incrementDuplicateRepliesReceived() { this.duplicateRepliesReceived.incrementAndGet(); } - void incrementOrphanRepliesReceived() { + @Override + public void incrementOrphanRepliesReceived() { this.orphanRepliesReceived.incrementAndGet(); } - void incrementReconnects() { + @Override + public void incrementReconnects() { this.reconnects.incrementAndGet(); } - void incrementInMsgs() { + @Override + public void incrementInMsgs() { this.inMsgs.incrementAndGet(); } - void incrementOutMsgs() { + @Override + public void incrementOutMsgs() { this.outMsgs.incrementAndGet(); } - void incrementInBytes(long bytes) { + @Override + public void incrementInBytes(long bytes) { this.inBytes.addAndGet(bytes); } - void incrementOutMsgsAndBytes(long bytes) { - incrementOutMsgs(); - incrementOutBytes(bytes); - } - - void incrementOutBytes(long bytes) { + @Override + public void incrementOutBytes(long bytes) { this.outBytes.addAndGet(bytes); } - void incrementFlushCounter() { + @Override + public void incrementFlushCounter() { this.flushCounter.incrementAndGet(); } - void incrementOutstandingRequests() { + @Override + public void incrementOutstandingRequests() { this.outstandingRequests.incrementAndGet(); } - void decrementOutstandingRequests() { + @Override + public void decrementOutstandingRequests() { this.outstandingRequests.decrementAndGet(); } - void registerSummaryStat(LongSummaryStatistics stats, long value) { - if(!trackAdvanced) { + @Override + public void registerRead(long bytes) { + if (!trackAdvanced) { return; } - lock.lock(); + + readStatsLock.lock(); try { - stats.accept(value); + readStats.accept(bytes); } finally { - lock.unlock(); + readStatsLock.unlock(); } } - void registerRead(long bytes) { - registerSummaryStat(readStats, bytes); - } + @Override + public void registerWrite(long bytes) { + if (!trackAdvanced) { + return; + } - void registerWrite(long bytes) { - registerSummaryStat(writeStats, bytes); + writeStatsLock.lock(); + try { + writeStats.accept(bytes); + } finally { + writeStatsLock.unlock(); + } } + @Override public long getPings() { return this.pingCount.get(); } + @Override public long getDroppedCount() { return this.droppedCount.get(); } + @Override public long getOKs() { return this.okCount.get(); } + @Override public long getErrs() { return this.errCount.get(); } + @Override public long getExceptions() { return this.exceptionCount.get(); } + @Override + public long getRequestsSent() { + return this.requestsSent.get(); + } + + @Override public long getReconnects() { return this.reconnects.get(); } + @Override public long getInMsgs() { return this.inMsgs.get(); } + @Override public long getOutMsgs() { return this.outMsgs.get(); } + @Override public long getInBytes() { return this.inBytes.get(); } + @Override public long getOutBytes() { return this.outBytes.get(); } - long getFlushCounter() { + @Override + public long getFlushCounter() { return flushCounter.get(); } - long getOutstandingRequests() { + @Override + public long getOutstandingRequests() { return outstandingRequests.get(); } + @Override public long getRepliesReceived() { return repliesReceived.get(); } + @Override public long getDuplicateRepliesReceived() { return duplicateRepliesReceived.get(); } + @Override public long getOrphanRepliesReceived() { return orphanRepliesReceived.get(); } void appendNumberStat(StringBuilder builder, String name, long value) { @@ -232,49 +281,54 @@ void appendNumberStat(StringBuilder builder, String name, double value) { public String toString() { StringBuilder builder = new StringBuilder(); - lock.lock(); - try { - builder.append("### Connection ###\n"); - appendNumberStat(builder, "Reconnects: ", this.reconnects.get()); - if (this.trackAdvanced) { - appendNumberStat(builder, "Requests Sent: ", this.requestsSent.get()); - appendNumberStat(builder, "Replies Received: ", this.repliesReceived.get()); - appendNumberStat(builder, "Duplicate Replies Received: ", this.duplicateRepliesReceived.get()); - appendNumberStat(builder, "Orphan Replies Received: ", this.orphanRepliesReceived.get()); - appendNumberStat(builder, "Pings Sent: ", this.pingCount.get()); - appendNumberStat(builder, "+OKs Received: ", this.okCount.get()); - appendNumberStat(builder, "-Errs Received: ", this.errCount.get()); - appendNumberStat(builder, "Handled Exceptions: ", this.exceptionCount.get()); - appendNumberStat(builder, "Successful Flush Calls: ", this.flushCounter.get()); - appendNumberStat(builder, "Outstanding Request Futures: ", this.outstandingRequests.get()); - appendNumberStat(builder, "Dropped Messages: ", this.droppedCount.get()); - } - builder.append("\n"); - builder.append("### Reader ###\n"); - appendNumberStat(builder, "Messages in: ", this.inMsgs.get()); - appendNumberStat(builder, "Bytes in: ", this.inBytes.get()); - builder.append("\n"); - if (this.trackAdvanced) { + builder.append("### Connection ###\n"); + appendNumberStat(builder, "Reconnects: ", this.reconnects.get()); + appendNumberStat(builder, "Requests Sent: ", this.requestsSent.get()); + appendNumberStat(builder, "Replies Received: ", this.repliesReceived.get()); + if (this.trackAdvanced) { + appendNumberStat(builder, "Duplicate Replies Received: ", this.duplicateRepliesReceived.get()); + appendNumberStat(builder, "Orphan Replies Received: ", this.orphanRepliesReceived.get()); + } + appendNumberStat(builder, "Pings Sent: ", this.pingCount.get()); + appendNumberStat(builder, "+OKs Received: ", this.okCount.get()); + appendNumberStat(builder, "-Errs Received: ", this.errCount.get()); + appendNumberStat(builder, "Handled Exceptions: ", this.exceptionCount.get()); + appendNumberStat(builder, "Successful Flush Calls: ", this.flushCounter.get()); + appendNumberStat(builder, "Outstanding Request Futures: ", this.outstandingRequests.get()); + appendNumberStat(builder, "Dropped Messages: ", this.droppedCount.get()); + builder.append("\n"); + builder.append("### Reader ###\n"); + appendNumberStat(builder, "Messages in: ", this.inMsgs.get()); + appendNumberStat(builder, "Bytes in: ", this.inBytes.get()); + builder.append("\n"); + if (this.trackAdvanced) { + readStatsLock.lock(); + try { appendNumberStat(builder, "Socket Reads: ", readStats.getCount()); appendNumberStat(builder, "Average Bytes Per Read: ", readStats.getAverage()); appendNumberStat(builder, "Min Bytes Per Read: ", readStats.getMin()); appendNumberStat(builder, "Max Bytes Per Read: ", readStats.getMax()); + } finally { + readStatsLock.unlock(); } - builder.append("\n"); - builder.append("### Writer ###\n"); - appendNumberStat(builder, "Messages out: ", this.outMsgs.get()); - appendNumberStat(builder, "Bytes out: ", this.outBytes.get()); - builder.append("\n"); - if (this.trackAdvanced) { + } + builder.append("\n"); + builder.append("### Writer ###\n"); + appendNumberStat(builder, "Messages out: ", this.outMsgs.get()); + appendNumberStat(builder, "Bytes out: ", this.outBytes.get()); + builder.append("\n"); + if (this.trackAdvanced) { + writeStatsLock.lock(); + try { appendNumberStat(builder, "Socket Writes: ", writeStats.getCount()); appendNumberStat(builder, "Average Bytes Per Write: ", writeStats.getAverage()); appendNumberStat(builder, "Min Bytes Per Write: ", writeStats.getMin()); appendNumberStat(builder, "Max Bytes Per Write: ", writeStats.getMax()); + } finally { + writeStatsLock.unlock(); } - } finally { - lock.unlock(); } return builder.toString(); } -} \ No newline at end of file +} diff --git a/src/test/java/io/nats/client/OptionsTests.java b/src/test/java/io/nats/client/OptionsTests.java index 4cbebdc01..38187fc4b 100644 --- a/src/test/java/io/nats/client/OptionsTests.java +++ b/src/test/java/io/nats/client/OptionsTests.java @@ -17,6 +17,7 @@ import io.nats.client.impl.DataPort; import io.nats.client.impl.ErrorListenerLoggerImpl; import io.nats.client.impl.TestHandler; +import io.nats.client.impl.TestStatisticsCollector; import io.nats.client.support.HttpRequest; import io.nats.client.support.NatsUri; import io.nats.client.utils.CloseOnUpgradeAttempt; @@ -96,6 +97,7 @@ private static void _testDefaultOptions(Options o) { assertTrue(o.getErrorListener() instanceof ErrorListenerLoggerImpl, "error handler"); assertNull(o.getConnectionListener(), "disconnect handler"); + assertNull(o.getStatisticsCollector(), "statistics collector"); assertFalse(o.isOldRequestStyle(), "default oldstyle"); } @@ -261,6 +263,20 @@ private static void _testChainedConnectionListener(ConnectionListener cHandler, assertSame(cHandler, o.getConnectionListener(), "chained connection handler"); } + @Test + public void testChainedStatisticsCollector() { + StatisticsCollector cHandler = new TestStatisticsCollector(); + Options o = new Options.Builder().statisticsCollector(cHandler).build(); + _testChainedStatisticsCollector(cHandler, o); + _testChainedStatisticsCollector(cHandler, new Options.Builder(o).build()); + } + + private static void _testChainedStatisticsCollector(StatisticsCollector cHandler, Options o) { + assertFalse(o.isVerbose(), "default verbose"); // One from a different type + assertTrue(o.getStatisticsCollector() instanceof TestStatisticsCollector, "statistics collector"); + assertSame(cHandler, o.getStatisticsCollector(), "chained statistics collector"); + } + @Test public void testPropertiesBooleanBuilder() { Properties props = new Properties(); @@ -566,6 +582,19 @@ public void testPropertyConnectionListeners() { assertEquals(((TestHandler) o.getConnectionListener()).getCount(), 3, "property connect handler class"); } + @Test + public void testPropertyStatisticsCollector() { + Properties props = new Properties(); + props.setProperty(Options.PROP_STATISTICS_COLLECTOR, TestStatisticsCollector.class.getCanonicalName()); + + Options o = new Options.Builder(props).build(); + assertFalse(o.isVerbose(), "default verbose"); // One from a different type + assertNotNull(o.getStatisticsCollector(), "property statistics collector"); + + o.getStatisticsCollector().incrementOutMsgs(); + assertEquals(o.getStatisticsCollector().getOutMsgs(), 1, "property statistics collector class"); + } + @Test public void testChainOverridesProperties() { Properties props = new Properties(); @@ -762,6 +791,15 @@ public void testBadClassInPropertyConnectionListeners() { }); } + @Test + public void testBadClassInPropertyStatisticsCollector() { + assertThrows(IllegalArgumentException.class, () -> { + Properties props = new Properties(); + props.setProperty(Options.PROP_STATISTICS_COLLECTOR, "foo"); + new Options.Builder(props); + }); + } + @Test public void testTokenAndUserThrows() { assertThrows(IllegalStateException.class, @@ -994,4 +1032,4 @@ public void testThrowOnBadContextSecureProp() { } } */ -} \ No newline at end of file +} diff --git a/src/test/java/io/nats/client/impl/NatsStatisticsTests.java b/src/test/java/io/nats/client/impl/NatsStatisticsTests.java index 3e4222b6f..9d38df92f 100644 --- a/src/test/java/io/nats/client/impl/NatsStatisticsTests.java +++ b/src/test/java/io/nats/client/impl/NatsStatisticsTests.java @@ -19,7 +19,9 @@ import java.time.Duration; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static io.nats.client.utils.TestBase.*; import static org.junit.jupiter.api.Assertions.*; public class NatsStatisticsTests { @@ -57,7 +59,7 @@ public void testInOutOKRequestStats() throws Exception { try (NatsTestServer ts = new NatsTestServer(false)) { Options options = new Options.Builder().server(ts.getURI()).verbose().build(); Connection nc = Nats.connect(options); - NatsStatistics stats = ((NatsConnection) nc).getNatsStatistics(); + StatisticsCollector stats = ((NatsConnection) nc).getNatsStatistics(); try { assertSame(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status"); @@ -92,4 +94,201 @@ public void testInOutOKRequestStats() throws Exception { } } } -} \ No newline at end of file + + @Test + public void testReadWriteAdvancedStatsEnabled() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false)) { + Options options = new Options.Builder().server(ts.getURI()).verbose().turnOnAdvancedStats().build(); + Connection nc = Nats.connect(options); + StatisticsCollector stats = ((NatsConnection) nc).getNatsStatistics(); + + try { + assertSame(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status"); + + Dispatcher d = nc.createDispatcher((msg) -> { + Message m = NatsMessage.builder() + .subject(msg.getReplyTo()) + .data(new byte[16]) + .headers(new Headers().put("header", "reply")) + .build(); + nc.publish(m); + }); + d.subscribe("subject"); + + Message m = NatsMessage.builder() + .subject("subject") + .data(new byte[8]) + .headers(new Headers().put("header", "request")) + .build(); + Future incoming = nc.request(m); + Message msg = incoming.get(500, TimeUnit.MILLISECONDS); + + assertNotNull(msg); + + // The read/write advanced stats are only exposed via toString, so assert on that + String stringStats = stats.toString(); + assertTrue(stringStats.contains("Socket Reads"), "readStats count"); + assertTrue(stringStats.contains("Average Bytes Per Read"), "readStats average bytes"); + assertTrue(stringStats.contains("Min Bytes Per Read"), "readStats min bytes"); + assertTrue(stringStats.contains("Max Bytes Per Read"), "readStats max bytes"); + + assertTrue(stringStats.contains("Socket Writes"), "writeStats count"); + assertTrue(stringStats.contains("Average Bytes Per Write"), "writeStats average bytes"); + assertTrue(stringStats.contains("Min Bytes Per Write"), "writeStats min bytes"); + assertTrue(stringStats.contains("Max Bytes Per Write"), "writeStats max bytes"); + } finally { + nc.close(); + } + } + } + + @Test + public void testReadWriteAdvancedStatsDisabled() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false)) { + Options options = new Options.Builder().server(ts.getURI()).verbose().build(); + Connection nc = Nats.connect(options); + StatisticsCollector stats = ((NatsConnection) nc).getNatsStatistics(); + + try { + assertSame(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status"); + + Dispatcher d = nc.createDispatcher((msg) -> { + Message m = NatsMessage.builder() + .subject(msg.getReplyTo()) + .data(new byte[16]) + .headers(new Headers().put("header", "reply")) + .build(); + nc.publish(m); + }); + d.subscribe("subject"); + + Message m = NatsMessage.builder() + .subject("subject") + .data(new byte[8]) + .headers(new Headers().put("header", "request")) + .build(); + Future incoming = nc.request(m); + Message msg = incoming.get(500, TimeUnit.MILLISECONDS); + + assertNotNull(msg); + + incoming = nc.request(m); + incoming.get(500, TimeUnit.MILLISECONDS); + + assertNotNull(msg); + + incoming = nc.request(m); + incoming.get(500, TimeUnit.MILLISECONDS); + + assertNotNull(msg); + + incoming = nc.request(m); + incoming.get(500, TimeUnit.MILLISECONDS); + + assertNotNull(msg); + + // The read/write advanced stats are only exposed via toString, so assert on that + String stringStats = stats.toString(); + assertFalse(stringStats.contains("Socket Reads"), "readStats count"); + assertFalse(stringStats.contains("Average Bytes Per Read"), "readStats average bytes"); + assertFalse(stringStats.contains("Min Bytes Per Read"), "readStats min bytes"); + assertFalse(stringStats.contains("Max Bytes Per Read"), "readStats max bytes"); + + assertFalse(stringStats.contains("Socket Writes"), "writeStats count"); + assertFalse(stringStats.contains("Average Bytes Per Write"), "writeStats average bytes"); + assertFalse(stringStats.contains("Min Bytes Per Write"), "writeStats min bytes"); + assertFalse(stringStats.contains("Max Bytes Per Write"), "writeStats max bytes"); + } finally { + nc.close(); + } + } + } + + @Test + public void testOrphanDuplicateRepliesAdvancedStatsEnabled() throws Exception { + Options.Builder builder = new Options.Builder().turnOnAdvancedStats(); + + runInServer(builder, nc -> { + AtomicInteger requests = new AtomicInteger(); + MessageHandler handler = (msg) -> { + requests.incrementAndGet(); + nc.publish(msg.getReplyTo(), null); + }; + Dispatcher d1 = nc.createDispatcher(handler); + Dispatcher d2 = nc.createDispatcher(handler); + Dispatcher d3 = nc.createDispatcher(handler); + Dispatcher d4 = nc.createDispatcher(msg -> { + sleep(5000); + handler.onMessage(msg); + }); + d1.subscribe(SUBJECT); + d2.subscribe(SUBJECT); + d3.subscribe(SUBJECT); + d4.subscribe(SUBJECT); + + Message reply = nc.request(SUBJECT, null, Duration.ofSeconds(2)); + assertNotNull(reply); + sleep(2000); + assertEquals(3, requests.get()); + NatsStatistics stats = (NatsStatistics) nc.getStatistics(); + assertEquals(1, stats.getRepliesReceived()); + assertEquals(2, stats.getDuplicateRepliesReceived()); + assertEquals(0, stats.getOrphanRepliesReceived()); + + sleep(3100); + assertEquals(4, requests.get()); + stats = (NatsStatistics) nc.getStatistics(); + assertEquals(1, stats.getRepliesReceived()); + assertEquals(2, stats.getDuplicateRepliesReceived()); + assertEquals(1, stats.getOrphanRepliesReceived()); + + String stringStats = stats.toString(); + assertTrue(stringStats.contains("Duplicate Replies Received"), "duplicate replies"); + assertTrue(stringStats.contains("Orphan Replies Received"), "orphan replies"); + }); + } + + @Test + public void testOrphanDuplicateRepliesAdvancedStatsDisabled() throws Exception { + Options.Builder builder = new Options.Builder(); + + runInServer(builder, nc -> { + AtomicInteger requests = new AtomicInteger(); + MessageHandler handler = (msg) -> { + requests.incrementAndGet(); + nc.publish(msg.getReplyTo(), null); + }; + Dispatcher d1 = nc.createDispatcher(handler); + Dispatcher d2 = nc.createDispatcher(handler); + Dispatcher d3 = nc.createDispatcher(handler); + Dispatcher d4 = nc.createDispatcher(msg -> { + sleep(5000); + handler.onMessage(msg); + }); + d1.subscribe(SUBJECT); + d2.subscribe(SUBJECT); + d3.subscribe(SUBJECT); + d4.subscribe(SUBJECT); + + Message reply = nc.request(SUBJECT, null, Duration.ofSeconds(2)); + assertNotNull(reply); + sleep(2000); + assertEquals(3, requests.get()); + NatsStatistics stats = (NatsStatistics) nc.getStatistics(); + assertEquals(1, stats.getRepliesReceived()); + assertEquals(0, stats.getDuplicateRepliesReceived()); + assertEquals(0, stats.getOrphanRepliesReceived()); + + sleep(3100); + assertEquals(4, requests.get()); + stats = (NatsStatistics) nc.getStatistics(); + assertEquals(1, stats.getRepliesReceived()); + assertEquals(0, stats.getDuplicateRepliesReceived()); + assertEquals(0, stats.getOrphanRepliesReceived()); + + String stringStats = stats.toString(); + assertFalse(stringStats.contains("Duplicate Replies Received"), "duplicate replies"); + assertFalse(stringStats.contains("Orphan Replies Received"), "orphan replies"); + }); + } +} diff --git a/src/test/java/io/nats/client/impl/PingTests.java b/src/test/java/io/nats/client/impl/PingTests.java index 7e5eb7b5a..45f1364c9 100644 --- a/src/test/java/io/nats/client/impl/PingTests.java +++ b/src/test/java/io/nats/client/impl/PingTests.java @@ -72,7 +72,7 @@ public void testPingTimer() throws Exception { try (NatsTestServer ts = new NatsTestServer(false)) { Options options = new Options.Builder().server(ts.getURI()).pingInterval(Duration.ofMillis(5)).build(); NatsConnection nc = (NatsConnection) Nats.connect(options); - NatsStatistics stats = nc.getNatsStatistics(); + StatisticsCollector stats = nc.getNatsStatistics(); try { assertTrue(Connection.Status.CONNECTED == nc.getStatus(), "Connected Status"); @@ -189,7 +189,7 @@ public void testPingTimerThroughReconnect() throws Exception { server(ts2.getURI()). pingInterval(Duration.ofMillis(5)).build(); NatsConnection nc = (NatsConnection) Nats.connect(options); - NatsStatistics stats = nc.getNatsStatistics(); + StatisticsCollector stats = nc.getNatsStatistics(); try { assertSame(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status"); @@ -223,7 +223,7 @@ public void testMessagesDelayPings() throws Exception, ExecutionException, Timeo Options options = new Options.Builder().server(ts.getURI()). pingInterval(Duration.ofMillis(200)).build(); NatsConnection nc = (NatsConnection) Nats.connect(options); - NatsStatistics stats = nc.getNatsStatistics(); + StatisticsCollector stats = nc.getNatsStatistics(); try { final CompletableFuture done = new CompletableFuture<>(); @@ -269,4 +269,4 @@ public void testRtt() throws Exception { assertThrows(IOException.class, nc::RTT); }); } -} \ No newline at end of file +} diff --git a/src/test/java/io/nats/client/impl/TestStatisticsCollector.java b/src/test/java/io/nats/client/impl/TestStatisticsCollector.java new file mode 100644 index 000000000..9f559378a --- /dev/null +++ b/src/test/java/io/nats/client/impl/TestStatisticsCollector.java @@ -0,0 +1,203 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.StatisticsCollector; + +import java.util.concurrent.atomic.AtomicLong; + +public class TestStatisticsCollector implements StatisticsCollector { + + private final AtomicLong outMsgs = new AtomicLong(); + + @Override + public void setAdvancedTracking(boolean trackAdvanced) { + + } + + @Override + public long getPings() { + return 0; + } + + @Override + public long getReconnects() { + return 0; + } + + @Override + public long getDroppedCount() { + return 0; + } + + @Override + public long getOKs() { + return 0; + } + + @Override + public long getErrs() { + return 0; + } + + @Override + public long getExceptions() { + return 0; + } + + @Override + public long getRequestsSent() { + return 0; + } + + @Override + public long getInMsgs() { + return 0; + } + + @Override + public long getOutMsgs() { + return outMsgs.get(); + } + + @Override + public long getInBytes() { + return 0; + } + + @Override + public long getOutBytes() { + return 0; + } + + @Override + public long getFlushCounter() { + return 0; + } + + @Override + public long getOutstandingRequests() { + return 0; + } + + @Override + public long getRepliesReceived() { + return 0; + } + + @Override + public long getDuplicateRepliesReceived() { + return 0; + } + + @Override + public long getOrphanRepliesReceived() { + return 0; + } + + @Override + public void incrementPingCount() { + + } + + @Override + public void incrementReconnects() { + + } + + @Override + public void incrementDroppedCount() { + + } + + @Override + public void incrementOkCount() { + + } + + @Override + public void incrementErrCount() { + + } + + @Override + public void incrementExceptionCount() { + + } + + @Override + public void incrementRequestsSent() { + + } + + @Override + public void incrementRepliesReceived() { + + } + + @Override + public void incrementDuplicateRepliesReceived() { + + } + + @Override + public void incrementOrphanRepliesReceived() { + + } + + @Override + public void incrementInMsgs() { + + } + + @Override + public void incrementOutMsgs() { + outMsgs.incrementAndGet(); + } + + @Override + public void incrementInBytes(long bytes) { + + } + + @Override + public void incrementOutBytes(long bytes) { + + } + + @Override + public void incrementFlushCounter() { + + } + + @Override + public void incrementOutstandingRequests() { + + } + + @Override + public void decrementOutstandingRequests() { + + } + + @Override + public void registerRead(long bytes) { + + } + + @Override + public void registerWrite(long bytes) { + + } +}