Skip to content

Commit

Permalink
Add StatisticsCollector to Options for custom statistics tracking (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnlcox authored Sep 7, 2023
1 parent 2c67241 commit 2a01825
Show file tree
Hide file tree
Showing 11 changed files with 814 additions and 100 deletions.
31 changes: 31 additions & 0 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ public class Options {
* {@link Builder#errorListener(ErrorListener) errorListener}.
*/
public static final String PROP_ERROR_LISTENER = PFX + "callback.error";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector}.
*/
public static final String PROP_STATISTICS_COLLECTOR = PFX + "statisticscollector";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#maxPingsOut(int) maxPingsOut}.
*/
Expand Down Expand Up @@ -556,6 +561,7 @@ public class Options {

private final ErrorListener errorListener;
private final ConnectionListener connectionListener;
private final StatisticsCollector statisticsCollector;
private final String dataPortType;

private final boolean trackAdvancedStats;
Expand Down Expand Up @@ -661,6 +667,7 @@ public static class Builder {

private ErrorListener errorListener = null;
private ConnectionListener connectionListener = null;
private StatisticsCollector statisticsCollector = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
Expand Down Expand Up @@ -762,6 +769,7 @@ public Builder properties(Properties props) {

classnameProperty(props, PROP_ERROR_LISTENER, o -> this.errorListener = (ErrorListener) o);
classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o);
classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o);

stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s);
stringProperty(props, PROP_INBOX_PREFIX, this::inboxPrefix);
Expand Down Expand Up @@ -1327,6 +1335,19 @@ public Builder connectionListener(ConnectionListener listener) {
return this;
}

/**
* Set the {@link StatisticsCollector StatisticsCollector} to collect connection metrics.
* <p>
* If not set, then a default implementation will be used.
*
* @param collector the new StatisticsCollector for this connection.
* @return the Builder for chaining
*/
public Builder statisticsCollector(StatisticsCollector collector) {
this.statisticsCollector = collector;
return this;
}

/**
* Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a
* cached thread pool that names threads after the connection name (or a default). This executor
Expand Down Expand Up @@ -1532,6 +1553,7 @@ else if (useDefaultTls) {
new SynchronousQueue<>(),
new DefaultThreadFactory(threadPrefix));
}

return new Options(this);
}

Expand Down Expand Up @@ -1580,6 +1602,7 @@ public Builder(Options o) {

this.errorListener = o.errorListener;
this.connectionListener = o.connectionListener;
this.statisticsCollector = o.statisticsCollector;
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
Expand Down Expand Up @@ -1638,6 +1661,7 @@ private Options(Builder b) {

this.errorListener = b.errorListener == null ? new ErrorListenerLoggerImpl() : b.errorListener;
this.connectionListener = b.connectionListener;
this.statisticsCollector = b.statisticsCollector;
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
Expand Down Expand Up @@ -1689,6 +1713,13 @@ public ConnectionListener getConnectionListener() {
return this.connectionListener;
}

/**
* @return the statistics collector, or null, see {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector()} in the builder doc
*/
public StatisticsCollector getStatisticsCollector() {
return this.statisticsCollector;
}

/**
* @return the auth handler, or null, see {@link Builder#authHandler(AuthHandler) authHandler()} in the builder doc
*/
Expand Down
73 changes: 64 additions & 9 deletions src/main/java/io/nats/client/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,88 @@
* <p>The Statistics toString() provides a summary of the statistics.
*/
public interface Statistics {

/**
* @return the total number of pings that have been sent from this connection.
*/
long getPings();

/**
* @return the total number of times this connection has tried to reconnect.
*/
long getReconnects();

/**
* @return the total number of messages dropped by this connection across all slow consumers.
*/
long getDroppedCount();

/**
* @return the total number of op +OKs received by this connection.
*/
long getOKs();

/**
* @return the total number of op -ERRs received by this connection.
*/
long getErrs();

/**
* @return the total number of exceptions seen by this connection.
*/
long getExceptions();

/**
* @return the total number of requests sent by this connection.
*/
long getRequestsSent();

/**
* @return the total number of replies received by this connection.
*/
long getRepliesReceived();

/**
* @return the total number of duplicate replies received by this connection.
*
* NOTE: This is only counted if advanced stats are enabled.
*/
long getDuplicateRepliesReceived();

/**
* @return the total number of orphan replies received by this connection.
*
* NOTE: This is only counted if advanced stats are enabled.
*/
long getOrphanRepliesReceived();

/**
* @return the total number of messages that have come in to this connection.
*/
public long getInMsgs();
long getInMsgs();

/**
* @return the total number of messages that have gone out of this connection.
*/
public long getOutMsgs();
long getOutMsgs();

/**
* @return the total number of message bytes that have come in to this connection.
*/
public long getInBytes();
long getInBytes();

/**
* @return the total number of message bytes that have gone out of to this connection.
* @return the total number of message bytes that have gone out of this connection.
*/
public long getOutBytes();
long getOutBytes();

/**
* @return the total number of times this connection has tried to reconnect.
* @return the total number of outgoing message flushes by this connection.
*/
public long getReconnects();
long getFlushCounter();

/**
* @return the total number of messages dropped by this connection across all slow consumers.
* @return the count of outstanding of requests from this connection.
*/
public long getDroppedCount();
long getOutstandingRequests();
}
131 changes: 131 additions & 0 deletions src/main/java/io/nats/client/StatisticsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

/**
* A collector for connection metrics.
* <p>
* Information about key metrics is incremented on this collector by the connection.
* <p>
* See {@link Statistics} for accessing the collected metrics.
*/
public interface StatisticsCollector extends Statistics {
/**
* Sets whether advanced stats are/should be tracked.
*/
void setAdvancedTracking(boolean trackAdvanced);

/**
* Increments the total number of pings that have been sent from this connection.
*/
void incrementPingCount();

/**
* Increments the total number of times this connection has tried to reconnect.
*/
void incrementReconnects();

/**
* Increments the total number of messages dropped by this connection across all slow consumers.
*/
void incrementDroppedCount();

/**
* Increments the total number of op +OKs received by this connection.
*/
void incrementOkCount();

/**
* Increments the total number of op -ERRs received by this connection.
*/
void incrementErrCount();

/**
* Increments the total number of exceptions seen by this connection.
*/
void incrementExceptionCount();

/**
* Increments the total number of requests sent by this connection.
*/
void incrementRequestsSent();

/**
* Increments the total number of replies received by this connection.
*/
void incrementRepliesReceived();

/**
* Increments the total number of duplicate replies received by this connection.
* <p>
* NOTE: This is only counted if advanced stats are enabled.
*/
void incrementDuplicateRepliesReceived();

/**
* Increments the total number of orphan replies received by this connection.
* <p>
* NOTE: This is only counted if advanced stats are enabled.
*/
void incrementOrphanRepliesReceived();

/**
* Increments the total number of messages that have come in to this connection.
*/
void incrementInMsgs();

/**
* Increments the total number of messages that have gone out of this connection.
*/
void incrementOutMsgs();

/**
* Increment the total number of message bytes that have come in to this connection.
*/
void incrementInBytes(long bytes);

/**
* Increment the total number of message bytes that have gone out of this connection.
*/
void incrementOutBytes(long bytes);

/**
* Increment the total number of outgoing message flushes by this connection.
*/
void incrementFlushCounter();

/**
* Increments the count of outstanding of requests from this connection.
*/
void incrementOutstandingRequests();

/**
* Decrements the count of outstanding of requests from this connection.
*/
void decrementOutstandingRequests();

/**
* Registers a Socket read by this connection.
* <p>
* NOTE: Implementations should only count this if advanced stats are enabled.
*/
void registerRead(long bytes);

/**
* Registers a Socket write by this connection.
* <p>
* NOTE: Implementations should only count this if advanced stats are enabled.
*/
void registerWrite(long bytes);
}
18 changes: 10 additions & 8 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class NatsConnection implements Connection {

private final Options options;

private final NatsStatistics statistics;
private final StatisticsCollector statistics;

private boolean connecting; // you can only connect in one thread
private boolean disconnecting; // you can only disconnect in one thread
Expand Down Expand Up @@ -111,7 +111,8 @@ class NatsConnection implements Connection {
this.options = options;

advancedTracking = options.isTrackAdvancedStats();
this.statistics = new NatsStatistics(advancedTracking);
this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
this.statistics.setAdvancedTracking(advancedTracking);

this.statusLock = new ReentrantLock();
this.statusChanged = this.statusLock.newCondition();
Expand Down Expand Up @@ -1231,11 +1232,12 @@ void deliverReply(Message msg) {
statistics.incrementRepliesReceived();
}
else if (!oldStyle && !subject.startsWith(mainInbox)) {
if (advancedTracking && responsesRespondedTo.get(key) != null) {
statistics.incrementDuplicateRepliesReceived();
}
else {
statistics.incrementOrphanRepliesReceived();
if (advancedTracking) {
if (responsesRespondedTo.get(key) != null) {
statistics.incrementDuplicateRepliesReceived();
} else {
statistics.incrementOrphanRepliesReceived();
}
}
}
}
Expand Down Expand Up @@ -1719,7 +1721,7 @@ public Statistics getStatistics() {
return this.statistics;
}

NatsStatistics getNatsStatistics() {
StatisticsCollector getNatsStatistics() {
return this.statistics;
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.nats.client.impl;

import io.nats.client.Options;
import io.nats.client.StatisticsCollector;

import java.io.IOException;
import java.nio.BufferOverflowException;
Expand Down Expand Up @@ -108,7 +109,7 @@ Future<Boolean> stop() {
return this.stopped;
}

synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, NatsStatistics stats)
synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats)
throws IOException {

int sendPosition = 0;
Expand Down Expand Up @@ -167,7 +168,7 @@ public void run() {

try {
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
NatsStatistics stats = this.connection.getNatsStatistics();
StatisticsCollector stats = this.connection.getNatsStatistics();
int maxAccumulate = Options.MAX_MESSAGES_IN_NETWORK_BUFFER;

while (this.running.get()) {
Expand Down
Loading

0 comments on commit 2a01825

Please sign in to comment.