From 6317568352f5156a44528a8a529ed87cb6793ed0 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Thu, 17 Aug 2023 13:47:04 -0400 Subject: [PATCH] Service / Simplification Documentation and Release Prep (#954) --- README.md | 8 +- .../jetstream/simple/ContextExample.java | 1 - .../jetstream/simple/FetchBytesExample.java | 1 - .../simple/FetchMessagesExample.java | 1 - .../simple/IterableConsumerExample.java | 1 - .../simple/MessageConsumerExample.java | 1 - .../jetstream/simple/NextExample.java | 1 - .../jetstream/simple/QueueExample.java | 1 - .../nats/examples/service/ServiceExample.java | 2 +- .../io/nats/client/BaseConsumeOptions.java | 1 - .../io/nats/client/BaseConsumerContext.java | 1 - .../io/nats/client/BaseMessageConsumer.java | 1 - src/main/java/io/nats/client/Connection.java | 2 - .../java/io/nats/client/ConsumeOptions.java | 1 - .../java/io/nats/client/ConsumerContext.java | 1 - .../io/nats/client/FetchConsumeOptions.java | 1 - .../java/io/nats/client/FetchConsumer.java | 1 - src/main/java/io/nats/client/JetStream.java | 2 - src/main/java/io/nats/client/Message.java | 2 +- .../java/io/nats/client/MessageConsumer.java | 1 - src/main/java/io/nats/client/Nats.java | 2 - .../nats/client/OrderedConsumerContext.java | 3 +- src/main/java/io/nats/client/ServerPool.java | 1 - .../java/io/nats/client/StreamContext.java | 1 - .../nats/client/impl/NatsConsumerContext.java | 2 +- .../io/nats/client/impl/NatsJetStream.java | 66 +++---- .../impl/NatsOrderedConsumerContext.java | 2 +- .../nats/client/impl/NatsStreamContext.java | 2 +- .../io/nats/client/support/Validator.java | 18 +- src/main/java/io/nats/service/Discovery.java | 71 +++++++- src/main/java/io/nats/service/Endpoint.java | 91 +++++++++- .../java/io/nats/service/EndpointContext.java | 6 +- ...dpointResponse.java => EndpointStats.java} | 128 ++++++++++---- src/main/java/io/nats/service/Group.java | 35 +++- .../java/io/nats/service/InfoResponse.java | 7 +- .../java/io/nats/service/PingResponse.java | 7 +- src/main/java/io/nats/service/Service.java | 73 +++++++- .../java/io/nats/service/ServiceBuilder.java | 83 ++++++++- .../java/io/nats/service/ServiceEndpoint.java | 107 +++++++++++- .../java/io/nats/service/ServiceMessage.java | 165 +++++++----------- .../nats/service/ServiceMessageHandler.java | 5 +- .../java/io/nats/service/ServiceResponse.java | 31 ++-- .../java/io/nats/service/StatsResponse.java | 65 +++++-- .../java/io/nats/client/impl/PingTests.java | 11 +- .../io/nats/client/impl/ReconnectTests.java | 2 +- .../nats/client/impl/SimplificationTests.java | 3 + .../java/io/nats/service/ServiceTests.java | 81 +++++---- 47 files changed, 782 insertions(+), 317 deletions(-) rename src/main/java/io/nats/service/{EndpointResponse.java => EndpointStats.java} (59%) diff --git a/README.md b/README.md index 595a20ab6..341802190 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,9 @@ A [Java](http://java.com) client for the [NATS messaging system](https://nats.io [![Build Main Badge](https://github.com/nats-io/nats.java/actions/workflows/build-main.yml/badge.svg?event=push)](https://github.com/nats-io/nats.java/actions/workflows/build-main.yml) [![Release Badge](https://github.com/nats-io/nats.java/actions/workflows/build-release.yml/badge.svg?event=release)](https://github.com/nats-io/nats.java/actions/workflows/build-release.yml) -## BETA / Experimental News - ### Simplification -There is a new simplified api that makes working with streams and consumers well, simpler! +There is a new simplified api that makes working with streams and consumers well, simpler! Simplification is released as of 2.16.14. Check out the examples: @@ -28,7 +26,7 @@ Check out the examples: ### Service Framework -The service API allows you to easily build NATS services The services API is currently in beta functionality. +The service API allows you to easily build NATS services. The Service Framework is released as of 2.16.14 The Services Framework introduces a higher-level API for implementing services with NATS. NATS has always been a strong technology on which to build services, as they are easy to write, are location and DNS independent and can be scaled up or down by simply adding or removing instances of the service. @@ -50,7 +48,7 @@ Version 2.5.0 adds some back pressure to publish calls to alleviate issues when Previous versions are still available in the repo. -#### Version 2.16.14 Options properties improvements +#### Version 2.16.14 Options properties improvements In this release, support was added to * support properties keys with or without the prefix 'io.nats.client.' diff --git a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java index 506db0fc5..742488189 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java @@ -22,7 +22,6 @@ /** * This example will demonstrate simplified contexts - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class ContextExample { private static final String STREAM = "context-stream"; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java index 1696e600a..1c70ca0cb 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java @@ -23,7 +23,6 @@ /** * This example will demonstrate simplified fetch - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class FetchBytesExample { private static final String STREAM = "fetch-bytes-stream"; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java index 3578e0f30..115b24936 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java @@ -23,7 +23,6 @@ /** * This example will demonstrate simplified fetch - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class FetchMessagesExample { private static final String STREAM = "fetch-messages-stream"; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java index 6276c60cd..29a01418e 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java @@ -23,7 +23,6 @@ /** * This example will demonstrate simplified IterableConsumer where the developer calls nextMessage. - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class IterableConsumerExample { private static final String STREAM = "iterable-stream"; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java index d751bb3e0..e6ec35c7b 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java @@ -25,7 +25,6 @@ /** * This example will demonstrate simplified consume with a handler - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class MessageConsumerExample { private static final String STREAM = "consume-stream"; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java b/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java index a70cac466..3d392a4b3 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java @@ -22,7 +22,6 @@ /** * This example will demonstrate simplified next - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class NextExample { private static final String STREAM = "next-stream"; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java b/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java index a50900c3b..5e56a0884 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java @@ -28,7 +28,6 @@ /** * This example will demonstrate simplified next - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class QueueExample { private static final String STREAM = "q-stream"; diff --git a/src/examples/java/io/nats/examples/service/ServiceExample.java b/src/examples/java/io/nats/examples/service/ServiceExample.java index e3a28ad5a..5692b27ab 100644 --- a/src/examples/java/io/nats/examples/service/ServiceExample.java +++ b/src/examples/java/io/nats/examples/service/ServiceExample.java @@ -29,7 +29,7 @@ import java.util.function.Supplier; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * This example demonstrates basic setup and use of the Service Framework */ public class ServiceExample { public static void main(String[] args) throws IOException { diff --git a/src/main/java/io/nats/client/BaseConsumeOptions.java b/src/main/java/io/nats/client/BaseConsumeOptions.java index f71c1c616..be3937f45 100644 --- a/src/main/java/io/nats/client/BaseConsumeOptions.java +++ b/src/main/java/io/nats/client/BaseConsumeOptions.java @@ -16,7 +16,6 @@ /** * Base Consume Options are provided to customize the way the consume and * fetch operate. It is the base class for ConsumeOptions and FetchConsumeOptions. - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class BaseConsumeOptions { public static final int DEFAULT_MESSAGE_COUNT = 500; diff --git a/src/main/java/io/nats/client/BaseConsumerContext.java b/src/main/java/io/nats/client/BaseConsumerContext.java index 48961b27a..157fc2902 100644 --- a/src/main/java/io/nats/client/BaseConsumerContext.java +++ b/src/main/java/io/nats/client/BaseConsumerContext.java @@ -18,7 +18,6 @@ /** * The Consumer Context provides a convenient interface around a defined JetStream Consumer - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface BaseConsumerContext { diff --git a/src/main/java/io/nats/client/BaseMessageConsumer.java b/src/main/java/io/nats/client/BaseMessageConsumer.java index 7e34329fb..4a63c2009 100644 --- a/src/main/java/io/nats/client/BaseMessageConsumer.java +++ b/src/main/java/io/nats/client/BaseMessageConsumer.java @@ -16,7 +16,6 @@ /** * The BaseMessageConsumer interface is the core interface replacing * a subscription for a simplified consumer. - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface BaseMessageConsumer extends AutoCloseable { /** diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index c23ac46ce..aeaf3477a 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -576,7 +576,6 @@ enum Status { /** * Get a consumer context for a specific named stream and specific named consumer. * Verifies that the stream and consumer exist. - * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream * @param consumerName the name of the consumer * @return a ConsumerContext object @@ -589,7 +588,6 @@ enum Status { /** * Get a consumer context for a specific named stream and specific named consumer. * Verifies that the stream and consumer exist. - * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream * @param consumerName the name of the consumer * @param options JetStream options. diff --git a/src/main/java/io/nats/client/ConsumeOptions.java b/src/main/java/io/nats/client/ConsumeOptions.java index 7e31b7fba..71425b251 100644 --- a/src/main/java/io/nats/client/ConsumeOptions.java +++ b/src/main/java/io/nats/client/ConsumeOptions.java @@ -15,7 +15,6 @@ /** * Consume Options are provided to customize the consume operation. - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class ConsumeOptions extends BaseConsumeOptions { public static ConsumeOptions DEFAULT_CONSUME_OPTIONS = ConsumeOptions.builder().build(); diff --git a/src/main/java/io/nats/client/ConsumerContext.java b/src/main/java/io/nats/client/ConsumerContext.java index dcbb2b8c0..fad8ac553 100644 --- a/src/main/java/io/nats/client/ConsumerContext.java +++ b/src/main/java/io/nats/client/ConsumerContext.java @@ -19,7 +19,6 @@ /** * The Consumer Context provides a convenient interface around a defined JetStream Consumer - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface ConsumerContext extends BaseConsumerContext { /** diff --git a/src/main/java/io/nats/client/FetchConsumeOptions.java b/src/main/java/io/nats/client/FetchConsumeOptions.java index 64dc94c53..f3f8b8d52 100644 --- a/src/main/java/io/nats/client/FetchConsumeOptions.java +++ b/src/main/java/io/nats/client/FetchConsumeOptions.java @@ -15,7 +15,6 @@ /** * Fetch Consume Options are provided to customize the fetch operation. - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class FetchConsumeOptions extends BaseConsumeOptions { public static FetchConsumeOptions DEFAULT_FETCH_OPTIONS = FetchConsumeOptions.builder().build(); diff --git a/src/main/java/io/nats/client/FetchConsumer.java b/src/main/java/io/nats/client/FetchConsumer.java index a5bc35230..137f300b1 100644 --- a/src/main/java/io/nats/client/FetchConsumer.java +++ b/src/main/java/io/nats/client/FetchConsumer.java @@ -16,7 +16,6 @@ /** * A fetch consumer gets messages by calling nextMessage. * nextMessage returns null when there are no more messages - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface FetchConsumer extends MessageConsumer { /** diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java index b196ddb18..b26d40d5e 100644 --- a/src/main/java/io/nats/client/JetStream.java +++ b/src/main/java/io/nats/client/JetStream.java @@ -480,7 +480,6 @@ public interface JetStream { /** * Get a stream context for a specific named stream. Verifies that the stream exists. - * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream * @return a StreamContext object * @throws IOException covers various communication issues with the NATS @@ -492,7 +491,6 @@ public interface JetStream { /** * Get a consumer context for a specific named stream and specific named consumer. * Verifies that the stream and consumer exist. - * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream * @param consumerName the name of the consumer * @return a ConsumerContext object diff --git a/src/main/java/io/nats/client/Message.java b/src/main/java/io/nats/client/Message.java index 9b0da4179..a06a17bd4 100644 --- a/src/main/java/io/nats/client/Message.java +++ b/src/main/java/io/nats/client/Message.java @@ -47,7 +47,7 @@ public interface Message { boolean hasHeaders(); /** - * @return the headers object the message + * @return the headers object for the message */ Headers getHeaders(); diff --git a/src/main/java/io/nats/client/MessageConsumer.java b/src/main/java/io/nats/client/MessageConsumer.java index 5f87aa43d..d641b8a1b 100644 --- a/src/main/java/io/nats/client/MessageConsumer.java +++ b/src/main/java/io/nats/client/MessageConsumer.java @@ -20,7 +20,6 @@ /** * The MessageConsumer interface is the core interface replacing * a subscription for a simplified consumer. - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface MessageConsumer extends AutoCloseable { /** diff --git a/src/main/java/io/nats/client/Nats.java b/src/main/java/io/nats/client/Nats.java index b5ec6f56b..50ba237e3 100644 --- a/src/main/java/io/nats/client/Nats.java +++ b/src/main/java/io/nats/client/Nats.java @@ -221,8 +221,6 @@ public static Connection connect(Options options) throws IOException, Interrupte *

If there is an exception before a connection is created, and the error * listener is set, it will be notified with a null connection. * - *

This method is experimental, please provide feedback on its value. - * * @param options the connection options * @param reconnectOnConnect if true, the connection will treat the initial * connection as any other and attempt reconnects on diff --git a/src/main/java/io/nats/client/OrderedConsumerContext.java b/src/main/java/io/nats/client/OrderedConsumerContext.java index 3063c90e9..fb0262ae3 100644 --- a/src/main/java/io/nats/client/OrderedConsumerContext.java +++ b/src/main/java/io/nats/client/OrderedConsumerContext.java @@ -14,8 +14,7 @@ package io.nats.client; /** - * The Consumer Context provides a convenient interface around a defined JetStream Consumer - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + * The Ordered Consumer and it's context provide a simplification interface to the ordered consumer behavior. */ public interface OrderedConsumerContext extends BaseConsumerContext { } diff --git a/src/main/java/io/nats/client/ServerPool.java b/src/main/java/io/nats/client/ServerPool.java index 6e538637f..c620308e0 100644 --- a/src/main/java/io/nats/client/ServerPool.java +++ b/src/main/java/io/nats/client/ServerPool.java @@ -19,7 +19,6 @@ /** * Allows the developer to provide the list of servers to try for connecting/reconnecting - * IMPORTANT! ServerPool IS CURRENTLY EXPERIMENTAL AND SUBJECT TO CHANGE. */ public interface ServerPool { diff --git a/src/main/java/io/nats/client/StreamContext.java b/src/main/java/io/nats/client/StreamContext.java index af9ac137c..33a26ad4c 100644 --- a/src/main/java/io/nats/client/StreamContext.java +++ b/src/main/java/io/nats/client/StreamContext.java @@ -21,7 +21,6 @@ /** * The Stream Context provide a set of operations for managing the stream * and its contents and for managing consumers. - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface StreamContext { /** diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index 2c42e82f2..b04dd23a1 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -28,7 +28,7 @@ import static io.nats.client.impl.NatsJetStreamSubscription.EXPIRE_ADJUSTMENT; /** - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + * Implementation of Consumer Context */ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscriptionMaker { private final Object stateLock; diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 6649ca64e..323b5abbb 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -459,39 +459,39 @@ public List getChanges(ConsumerConfiguration serverCc) { ConsumerConfigurationComparer serverCcc = new ConsumerConfigurationComparer(serverCc); List changes = new ArrayList<>(); - if (deliverPolicy != null && deliverPolicy != serverCcc.getDeliverPolicy()) { changes.add("deliverPolicy"); }; - if (ackPolicy != null && ackPolicy != serverCcc.getAckPolicy()) { changes.add("ackPolicy"); }; - if (replayPolicy != null && replayPolicy != serverCcc.getReplayPolicy()) { changes.add("replayPolicy"); }; - - if (flowControl != null && flowControl != serverCcc.isFlowControl()) { changes.add("flowControl"); }; - if (headersOnly != null && headersOnly != serverCcc.isHeadersOnly()) { changes.add("headersOnly"); }; - if (memStorage != null && memStorage != serverCcc.isMemStorage()) { changes.add("memStorage"); }; - - if (startSeq != null && !startSeq.equals(serverCcc.getStartSequence())) { changes.add("startSequence"); }; - if (rateLimit != null && !rateLimit.equals(serverCcc.getRateLimit())) { changes.add("rateLimit"); }; - - if (maxDeliver != null && maxDeliver != serverCcc.getMaxDeliver()) { changes.add("maxDeliver"); }; - if (maxAckPending != null && maxAckPending != serverCcc.getMaxAckPending()) { changes.add("maxAckPending"); }; - if (maxPullWaiting != null && maxPullWaiting != serverCcc.getMaxPullWaiting()) { changes.add("maxPullWaiting"); }; - if (maxBatch != null && maxBatch != serverCcc.getMaxBatch()) { changes.add("maxBatch"); }; - if (maxBytes != null && maxBytes != serverCcc.getMaxBytes()) { changes.add("maxBytes"); }; - if (numReplicas != null && !numReplicas.equals(serverCcc.numReplicas)) { changes.add("numReplicas"); }; - - if (ackWait != null && !ackWait.equals(getOrUnset(serverCcc.ackWait))) { changes.add("ackWait"); }; - if (idleHeartbeat != null && !idleHeartbeat.equals(getOrUnset(serverCcc.idleHeartbeat))) { changes.add("idleHeartbeat"); }; - if (maxExpires != null && !maxExpires.equals(getOrUnset(serverCcc.maxExpires))) { changes.add("maxExpires"); }; - if (inactiveThreshold != null && !inactiveThreshold.equals(getOrUnset(serverCcc.inactiveThreshold))) { changes.add("inactiveThreshold"); }; - - if (startTime != null && !startTime.equals(serverCcc.startTime)) { changes.add("startTime"); }; - - if (filterSubject != null && !filterSubject.equals(serverCcc.filterSubject)) { changes.add("filterSubject"); }; - if (description != null && !description.equals(serverCcc.description)) { changes.add("description"); }; - if (sampleFrequency != null && !sampleFrequency.equals(serverCcc.sampleFrequency)) { changes.add("sampleFrequency"); }; - if (deliverSubject != null && !deliverSubject.equals(serverCcc.deliverSubject)) { changes.add("deliverSubject"); }; - if (deliverGroup != null && !deliverGroup.equals(serverCcc.deliverGroup)) { changes.add("deliverGroup"); }; - - if (backoff != null && !listsAreEqual(backoff, serverCcc.backoff, true)) { changes.add("backoff"); }; - if (metadata != null && !mapsAreEqual(metadata, serverCcc.metadata, true)) { changes.add("metadata"); }; + if (deliverPolicy != null && deliverPolicy != serverCcc.getDeliverPolicy()) { changes.add("deliverPolicy"); } + if (ackPolicy != null && ackPolicy != serverCcc.getAckPolicy()) { changes.add("ackPolicy"); } + if (replayPolicy != null && replayPolicy != serverCcc.getReplayPolicy()) { changes.add("replayPolicy"); } + + if (flowControl != null && flowControl != serverCcc.isFlowControl()) { changes.add("flowControl"); } + if (headersOnly != null && headersOnly != serverCcc.isHeadersOnly()) { changes.add("headersOnly"); } + if (memStorage != null && memStorage != serverCcc.isMemStorage()) { changes.add("memStorage"); } + + if (startSeq != null && !startSeq.equals(serverCcc.getStartSequence())) { changes.add("startSequence"); } + if (rateLimit != null && !rateLimit.equals(serverCcc.getRateLimit())) { changes.add("rateLimit"); } + + if (maxDeliver != null && maxDeliver != serverCcc.getMaxDeliver()) { changes.add("maxDeliver"); } + if (maxAckPending != null && maxAckPending != serverCcc.getMaxAckPending()) { changes.add("maxAckPending"); } + if (maxPullWaiting != null && maxPullWaiting != serverCcc.getMaxPullWaiting()) { changes.add("maxPullWaiting"); } + if (maxBatch != null && maxBatch != serverCcc.getMaxBatch()) { changes.add("maxBatch"); } + if (maxBytes != null && maxBytes != serverCcc.getMaxBytes()) { changes.add("maxBytes"); } + if (numReplicas != null && !numReplicas.equals(serverCcc.numReplicas)) { changes.add("numReplicas"); } + + if (ackWait != null && !ackWait.equals(getOrUnset(serverCcc.ackWait))) { changes.add("ackWait"); } + if (idleHeartbeat != null && !idleHeartbeat.equals(getOrUnset(serverCcc.idleHeartbeat))) { changes.add("idleHeartbeat"); } + if (maxExpires != null && !maxExpires.equals(getOrUnset(serverCcc.maxExpires))) { changes.add("maxExpires"); } + if (inactiveThreshold != null && !inactiveThreshold.equals(getOrUnset(serverCcc.inactiveThreshold))) { changes.add("inactiveThreshold"); } + + if (startTime != null && !startTime.equals(serverCcc.startTime)) { changes.add("startTime"); } + + if (filterSubject != null && !filterSubject.equals(serverCcc.filterSubject)) { changes.add("filterSubject"); } + if (description != null && !description.equals(serverCcc.description)) { changes.add("description"); } + if (sampleFrequency != null && !sampleFrequency.equals(serverCcc.sampleFrequency)) { changes.add("sampleFrequency"); } + if (deliverSubject != null && !deliverSubject.equals(serverCcc.deliverSubject)) { changes.add("deliverSubject"); } + if (deliverGroup != null && !deliverGroup.equals(serverCcc.deliverGroup)) { changes.add("deliverGroup"); } + + if (backoff != null && !listsAreEqual(backoff, serverCcc.backoff, true)) { changes.add("backoff"); } + if (metadata != null && !mapsAreEqual(metadata, serverCcc.metadata, true)) { changes.add("metadata"); } // do not need to check Durable because the original is retrieved by the durable name diff --git a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java index 5eafbb800..887e60547 100644 --- a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java @@ -20,7 +20,7 @@ import java.time.Duration; /** - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + * Implementation of Ordered Consumer Context */ public class NatsOrderedConsumerContext implements OrderedConsumerContext { NatsConsumerContext impl; diff --git a/src/main/java/io/nats/client/impl/NatsStreamContext.java b/src/main/java/io/nats/client/impl/NatsStreamContext.java index 875949224..44081ae0f 100644 --- a/src/main/java/io/nats/client/impl/NatsStreamContext.java +++ b/src/main/java/io/nats/client/impl/NatsStreamContext.java @@ -20,7 +20,7 @@ import java.util.List; /** - * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + * Implementation of Stream Context */ class NatsStreamContext implements StreamContext { final String streamName; diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index c4aeb9745..e59c2bb73 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -46,7 +46,7 @@ public static String validateSubject(String subject, String label, boolean requi } } else if (!segment.equals("*") && notPrintable(segment)) { - throw new IllegalArgumentException(label + " must be printable characters only."); + throw new IllegalArgumentException(label + " must be printable characters only."); } } return subject; @@ -75,10 +75,10 @@ public static String validateConsumerName(String s, boolean required) { public static String validatePrefixOrDomain(String s, String label, boolean required) { return _validate(s, required, label, () -> { if (s.startsWith(DOT)) { - throw new IllegalArgumentException(label + " cannot start with `.` [" + s + "]"); + throw new IllegalArgumentException(label + " cannot start with '.' [" + s + "]"); } if (notPrintableOrHasWildGt(s)) { - throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include `*`, `>` [" + s + "]"); + throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include '*', '>' [" + s + "]"); } return s; }); @@ -182,7 +182,7 @@ public static String validatePrintable(String s, String label, boolean required) public static String validatePrintableExceptWildDotGt(String s, String label, boolean required) { return _validate(s, required, label, () -> { if (notPrintableOrHasWildGtDot(s)) { - throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include `*`, `.` or `>` [" + s + "]"); + throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include '*', '.' or '>' [" + s + "]"); } return s; }); @@ -191,7 +191,7 @@ public static String validatePrintableExceptWildDotGt(String s, String label, bo public static String validatePrintableExceptWildDotGtSlashes(String s, String label, boolean required) { return _validate(s, required, label, () -> { if (notPrintableOrHasWildGtDotSlashes(s)) { - throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include `*`, `.`, `>`, `\\` or `/` [" + s + "]"); + throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include '*', '.', '>', '\\' or '/' [" + s + "]"); } return s; }); @@ -200,7 +200,7 @@ public static String validatePrintableExceptWildDotGtSlashes(String s, String la public static String validatePrintableExceptWildGt(String s, String label, boolean required) { return _validate(s, required, label, () -> { if (notPrintableOrHasWildGt(s)) { - throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include `*` or `>` [" + s + "]"); + throw new IllegalArgumentException(label + " must be in the printable ASCII range and cannot include '*' or '>' [" + s + "]"); } return s; }); @@ -209,7 +209,7 @@ public static String validatePrintableExceptWildGt(String s, String label, boole public static String validateIsRestrictedTerm(String s, String label, boolean required) { return _validate(s, required, label, () -> { if (notRestrictedTerm(s)) { - throw new IllegalArgumentException(label + " must only contain A-Z, a-z, 0-9, `-` or `_` [" + s + "]"); + throw new IllegalArgumentException(label + " must only contain A-Z, a-z, 0-9, '-' or '_' [" + s + "]"); } return s; }); @@ -222,7 +222,7 @@ public static String validateBucketName(String s, boolean required) { public static String validateWildcardKvKey(String s, String label, boolean required) { return _validate(s, required, label, () -> { if (notWildcardKvKey(s)) { - throw new IllegalArgumentException(label + " must only contain A-Z, a-z, 0-9, `*`, `-`, `_`, `/`, `=`, `>` or `.` and cannot start with `.` [" + s + "]"); + throw new IllegalArgumentException(label + " must only contain A-Z, a-z, 0-9, '*', '-', '_', '/', '=', '>' or '.' and cannot start with '.' [" + s + "]"); } return s; }); @@ -231,7 +231,7 @@ public static String validateWildcardKvKey(String s, String label, boolean requi public static String validateNonWildcardKvKey(String s, String label, boolean required) { return _validate(s, required, label, () -> { if (notNonWildcardKvKey(s)) { - throw new IllegalArgumentException(label + " must only contain A-Z, a-z, 0-9, `-`, `_`, `/`, `=` or `.` and cannot start with `.` [" + s + "]"); + throw new IllegalArgumentException(label + " must only contain A-Z, a-z, 0-9, '-', '_', '/', '=' or '.' and cannot start with '.' [" + s + "]"); } return s; }); diff --git a/src/main/java/io/nats/service/Discovery.java b/src/main/java/io/nats/service/Discovery.java index b8778f3e6..73da2341d 100644 --- a/src/main/java/io/nats/service/Discovery.java +++ b/src/main/java/io/nats/service/Discovery.java @@ -26,7 +26,12 @@ import static io.nats.service.Service.*; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Discovery is a utility class to help discover services by executing Ping, Info and Stats requests + * You are required to provide a connection. + * Optionally you can set 'maxTimeMillis' and 'maxResults'. When making a discovery request, + * the discovery will wait until the first one of those thresholds is reached before returning the results. + *

'maxTimeMillis' defaults to {@value DEFAULT_DISCOVERY_MAX_TIME_MILLIS}

+ *

'maxResults' defaults tp {@value DEFAULT_DISCOVERY_MAX_RESULTS}

*/ public class Discovery { public static final long DEFAULT_DISCOVERY_MAX_TIME_MILLIS = 5000; @@ -38,10 +43,20 @@ public class Discovery { private Supplier inboxSupplier; + /** + * Construct a Discovery instance with a connection and default maxTimeMillis / maxResults + * @param conn the NATS Connection + */ public Discovery(Connection conn) { - this(conn, -1, -1); + this(conn, 0, 0); } + /** + * Construct a Discovery instance + * @param conn the NATS Connection + * @param maxTimeMillis the maximum time to wait for discovery requests to complete or any number less than 1 to use the default + * @param maxResults the maximum number of results to wait for or any number less than 1 to use the default + */ public Discovery(Connection conn, long maxTimeMillis, int maxResults) { this.conn = conn; this.maxTimeMillis = maxTimeMillis < 1 ? DEFAULT_DISCOVERY_MAX_TIME_MILLIS : maxTimeMillis; @@ -49,6 +64,10 @@ public Discovery(Connection conn, long maxTimeMillis, int maxResults) { setInboxSupplier(null); } + /** + * Override the normal inbox with a custom inbox to support you security model + * @param inboxSupplier the supplier + */ public void setInboxSupplier(Supplier inboxSupplier) { this.inboxSupplier = inboxSupplier == null ? conn::createInbox : inboxSupplier; } @@ -56,16 +75,32 @@ public void setInboxSupplier(Supplier inboxSupplier) { // ---------------------------------------------------------------------------------------------------- // ping // ---------------------------------------------------------------------------------------------------- + + /** + * Make a ping request to all services running on the server. + * @return the list of {@link PingResponse} + */ public List ping() { return ping(null); } + /** + * Make a ping request only to services having the matching service name + * @param serviceName the service name + * @return the list of {@link PingResponse} + */ public List ping(String serviceName) { List list = new ArrayList<>(); discoverMany(SRV_PING, serviceName, jsonBytes -> list.add(new PingResponse(jsonBytes))); return list; } + /** + * Make a ping request to a specific instance of a service having matching service name and id + * @param serviceName the service name + * @param serviceId the specific service id + * @return the list of {@link PingResponse} + */ public PingResponse ping(String serviceName, String serviceId) { byte[] jsonBytes = discoverOne(SRV_PING, serviceName, serviceId); return jsonBytes == null ? null : new PingResponse(jsonBytes); @@ -74,16 +109,32 @@ public PingResponse ping(String serviceName, String serviceId) { // ---------------------------------------------------------------------------------------------------- // info // ---------------------------------------------------------------------------------------------------- + + /** + * Make an info request to all services running on the server. + * @return the list of {@link InfoResponse} + */ public List info() { return info(null); } + /** + * Make an info request only to services having the matching service name + * @param serviceName the service name + * @return the list of {@link InfoResponse} + */ public List info(String serviceName) { List list = new ArrayList<>(); discoverMany(SRV_INFO, serviceName, jsonBytes -> list.add(new InfoResponse(jsonBytes))); return list; } + /** + * Make an info request to a specific instance of a service having matching service name and id + * @param serviceName the service name + * @param serviceId the specific service id + * @return the list of {@link InfoResponse} + */ public InfoResponse info(String serviceName, String serviceId) { byte[] jsonBytes = discoverOne(SRV_INFO, serviceName, serviceId); return jsonBytes == null ? null : new InfoResponse(jsonBytes); @@ -92,16 +143,32 @@ public InfoResponse info(String serviceName, String serviceId) { // ---------------------------------------------------------------------------------------------------- // stats // ---------------------------------------------------------------------------------------------------- + + /** + * Make a stats request to all services running on the server. + * @return the list of {@link StatsResponse} + */ public List stats() { return stats(null); } + /** + * Make a stats request only to services having the matching service name + * @param serviceName the service name + * @return the list of {@link StatsResponse} + */ public List stats(String serviceName) { List list = new ArrayList<>(); discoverMany(SRV_STATS, serviceName, jsonBytes -> list.add(new StatsResponse(jsonBytes))); return list; } + /** + * Make a stats request to a specific instance of a service having matching service name and id + * @param serviceName the service name + * @param serviceId the specific service id + * @return the list of {@link StatsResponse} + */ public StatsResponse stats(String serviceName, String serviceId) { byte[] jsonBytes = discoverOne(SRV_STATS, serviceName, serviceId); return jsonBytes == null ? null : new StatsResponse(jsonBytes); diff --git a/src/main/java/io/nats/service/Endpoint.java b/src/main/java/io/nats/service/Endpoint.java index c14eafb60..339d7aaab 100644 --- a/src/main/java/io/nats/service/Endpoint.java +++ b/src/main/java/io/nats/service/Endpoint.java @@ -17,6 +17,7 @@ import io.nats.client.support.JsonUtils; import io.nats.client.support.JsonValue; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -28,25 +29,50 @@ import static io.nats.client.support.Validator.validateSubject; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Endpoint encapsulates the name, subject and metadata for a {@link ServiceEndpoint}. + *

Endpoints can be used directly or as part of a group. {@link ServiceEndpoint} and {@link Group}

+ *

Endpoint names and subjects are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'

+ *

To create an Endpoint, either use a direct constructor or use the Endpoint builder + * via the static method builder() or new Endpoint.Builder() to get an instance. + *

*/ public class Endpoint implements JsonSerializable { private final String name; private final String subject; private final Map metadata; + /** + * Directly construct an Endpoint with a name, which becomes the subject + * @param name the name + */ public Endpoint(String name) { this(name, null, null, true); } + /** + * Directly construct an Endpoint with a name, which becomes the subject, and metadata + * @param name the name + * @param metadata the metadata + */ public Endpoint(String name, Map metadata) { this(name, null, metadata, true); } + /** + * Directly construct an Endpoint with a name and a subject + * @param name the name + * @param subject the subject + */ public Endpoint(String name, String subject) { this(name, subject, null, true); } + /** + * Directly construct an Endpoint with a name, the subject, and metadata + * @param name the name + * @param subject the subject + * @param metadata the metadata + */ public Endpoint(String name, String subject, Map metadata) { this(name, subject, metadata, true); } @@ -66,7 +92,7 @@ public Endpoint(String name, String subject, Map metadata) { this.name = name; this.subject = subject; } - this.metadata = metadata == null || metadata.size() == 0 ? null : metadata; + this.metadata = metadata == null || metadata.isEmpty() ? null : metadata; } Endpoint(JsonValue vEndpoint) { @@ -93,48 +119,99 @@ public String toString() { return JsonUtils.toKey(getClass()) + toJson(); } + /** + * Get the name of the Endpoint + * @return the name + */ public String getName() { return name; } + /** + * Get the subject of the Endpoint + * @return the subject + */ public String getSubject() { return subject; } + /** + * Get a copy of the metadata of the Endpoint + * @return the copy of metadata + */ public Map getMetadata() { - return metadata; + return metadata == null ? null : new HashMap<>(metadata); } + /** + * Get an instance of an Endpoint builder. + * @return the instance + */ public static Builder builder() { return new Builder(); } + /** + * Build an Endpoint using a fluent builder. + */ public static class Builder { private String name; private String subject; private Map metadata; + /** + * Construct the builder + */ + public Builder() {} + + /** + * Copy the Endpoint, replacing all existing endpoint information. + * @param endpoint the endpoint to copy + * @return the Endpoint.Builder + */ public Builder endpoint(Endpoint endpoint) { - name = endpoint.getName(); - subject = endpoint.getSubject(); - return this; + return name(endpoint.getName()).subject(endpoint.getSubject()).metadata(endpoint.getMetadata()); } + /** + * Set the name for the Endpoint, replacing any name already set. + * @param name the endpoint name + * @return the Endpoint.Builder + */ public Builder name(String name) { this.name = name; return this; } + /** + * Set the subject for the Endpoint, replacing any subject already set. + * @param subject the subject + * @return the Endpoint.Builder + */ public Builder subject(String subject) { this.subject = subject; return this; } + /** + * Set the metadata for the Endpoint, replacing any metadata already set. + * @param metadata the metadata + * @return the Endpoint.Builder + */ public Builder metadata(Map metadata) { - this.metadata = metadata; + if (metadata == null || metadata.isEmpty()) { + this.metadata = null; + } + else { + this.metadata = new HashMap<>(metadata); + } return this; } + /** + * Build the Endpoint instance. + * @return the Endpoint instance + */ public Endpoint build() { return new Endpoint(this); } diff --git a/src/main/java/io/nats/service/EndpointContext.java b/src/main/java/io/nats/service/EndpointContext.java index ebda74fc0..ccef1acfa 100644 --- a/src/main/java/io/nats/service/EndpointContext.java +++ b/src/main/java/io/nats/service/EndpointContext.java @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicLong; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Internal class to support service implementation */ class EndpointContext { private static final String QGROUP = "q"; @@ -86,8 +86,8 @@ public void onMessage(Message msg) throws InterruptedException { } } - EndpointResponse getEndpointStats() { - return new EndpointResponse( + EndpointStats getEndpointStats() { + return new EndpointStats( se.getEndpoint().getName(), se.getSubject(), numRequests.get(), diff --git a/src/main/java/io/nats/service/EndpointResponse.java b/src/main/java/io/nats/service/EndpointStats.java similarity index 59% rename from src/main/java/io/nats/service/EndpointResponse.java rename to src/main/java/io/nats/service/EndpointStats.java index aa87aa742..6153251f1 100644 --- a/src/main/java/io/nats/service/EndpointResponse.java +++ b/src/main/java/io/nats/service/EndpointStats.java @@ -28,9 +28,47 @@ import static io.nats.client.support.JsonValueUtils.*; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Endpoints stats contains various stats and custom data for an endpoint. + * + * { + * "id": "ZP1oVevzLGu4CBORMXKKke", + * "name": "Service1", + * "version": "0.0.1", + * "endpoints": [{ + * "name": "SortEndpointAscending", + * "subject": "sort.ascending", + * "num_requests": 1, + * "processing_time": 538900, + * "average_processing_time": 538900, + * "started": "2023-08-15T13:51:41.318000000Z" + * } + * + * + * { + * "name": "SortEndpointDescending", + * "subject": "sort.descending", + * "num_requests": 1, + * "processing_time": 88400, + * "average_processing_time": 88400, + * "started": "2023-08-15T13:51:41.318000000Z" + * } + * + * + * { + * "name": "EchoEndpoint", + * "subject": "echo", + * "num_requests": 5, + * "processing_time": 1931600, + * "average_processing_time": 386320, + * "data": { + * "idata": 2, + * "sdata": "s-996409223" + * }, + * "started": "2023-08-15T13:51:41.318000000Z" + * } + * */ -public class EndpointResponse implements JsonSerializable { +public class EndpointStats implements JsonSerializable { private final String name; private final String subject; private final long numRequests; @@ -41,12 +79,11 @@ public class EndpointResponse implements JsonSerializable { private final JsonValue data; private final ZonedDateTime started; - static List listOf(JsonValue vEndpointStats) { - return JsonValueUtils.listOf(vEndpointStats, EndpointResponse::new); + static List listOf(JsonValue vEndpointStats) { + return JsonValueUtils.listOf(vEndpointStats, EndpointStats::new); } - // This is for stats - EndpointResponse(String name, String subject, long numRequests, long numErrors, long processingTime, String lastError, JsonValue data, ZonedDateTime started) { + EndpointStats(String name, String subject, long numRequests, long numErrors, long processingTime, String lastError, JsonValue data, ZonedDateTime started) { this.name = name; this.subject = subject; this.numRequests = numRequests; @@ -58,29 +95,16 @@ static List listOf(JsonValue vEndpointStats) { this.started = started; } - // This is for schema - EndpointResponse(String name, String subject) { - this.name = name; - this.subject = subject; - this.numRequests = 0; - this.numErrors = 0; - this.processingTime = 0; - this.averageProcessingTime = 0; - this.lastError = null; - this.data = null; - this.started = null; - } - - EndpointResponse(JsonValue vEndpointResponse) { - name = readString(vEndpointResponse, NAME); - subject = readString(vEndpointResponse, SUBJECT); - numRequests = readLong(vEndpointResponse, NUM_REQUESTS, 0); - numErrors = readLong(vEndpointResponse, NUM_ERRORS, 0); - processingTime = readLong(vEndpointResponse, PROCESSING_TIME, 0); - averageProcessingTime = readLong(vEndpointResponse, AVERAGE_PROCESSING_TIME, 0); - lastError = readString(vEndpointResponse, LAST_ERROR); - data = readValue(vEndpointResponse, DATA); - started = readDate(vEndpointResponse, STARTED); + EndpointStats(JsonValue vEndpointStats) { + name = readString(vEndpointStats, NAME); + subject = readString(vEndpointStats, SUBJECT); + numRequests = readLong(vEndpointStats, NUM_REQUESTS, 0); + numErrors = readLong(vEndpointStats, NUM_ERRORS, 0); + processingTime = readLong(vEndpointStats, PROCESSING_TIME, 0); + averageProcessingTime = readLong(vEndpointStats, AVERAGE_PROCESSING_TIME, 0); + lastError = readString(vEndpointStats, LAST_ERROR); + data = readValue(vEndpointStats, DATA); + started = readDate(vEndpointStats, STARTED); } @Override @@ -98,38 +122,82 @@ public String toJson() { return endJson(sb).toString(); } + /** + * Get the name of the Endpoint + * @return the name + */ public String getName() { return name; } + /** + * Get the subject of the Endpoint + * @return the subject + */ public String getSubject() { return subject; } + /** + * The number of requests received by the endpoint + * @return the number of requests + */ public long getNumRequests() { return numRequests; } + /** + * Number of errors that the endpoint has raised + * @return the number of errors + */ public long getNumErrors() { return numErrors; } + /** + * Total processing time for the endpoint + * @return the total processing time + */ public long getProcessingTime() { return processingTime; } + /** + * Average processing time is the total processing time divided by the num requests + * @return the average processing time + */ public long getAverageProcessingTime() { return averageProcessingTime; } + /** + * If set, the last error triggered by the endpoint + * @return the last error or null + */ public String getLastError() { return lastError; } + /** + * A field that can be customized with any data as returned by stats handler + * @return the JsonValue object representing the data + */ public JsonValue getData() { return data; } + /** + * The json representation of the custom data. May be null + * @return the json + */ + public String getDataAsJson() { + return data == null ? null : data.toJson(); + } + + /** + * Get the time the endpoint was started (or restarted) + * @return the start time + */ public ZonedDateTime getStarted() { return started; } @@ -144,7 +212,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - EndpointResponse that = (EndpointResponse) o; + EndpointStats that = (EndpointStats) o; if (numRequests != that.numRequests) return false; if (numErrors != that.numErrors) return false; diff --git a/src/main/java/io/nats/service/Group.java b/src/main/java/io/nats/service/Group.java index 96457cd7b..c6d65ac39 100644 --- a/src/main/java/io/nats/service/Group.java +++ b/src/main/java/io/nats/service/Group.java @@ -19,16 +19,28 @@ import static io.nats.client.support.Validator.validateSubject; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Group is way to organize endpoints by serving as a common prefix to all endpoints registered in it. */ public class Group { private final String name; private Group next; + /** + * Construct a group. + *

Group names and subjects are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'

+ * @param name the group name + */ public Group(String name) { this.name = validateSubject(name, "Group Name", true, true); } + /** + * Append a group at the end of the list of groups this group starts or is a part of. + * Appended groups can be traversed by doing {@link #getNext} + * Subsequent appends add the group to the end of the list. + * @param group the group to append + * @return like a fluent builder, return the Group instance + */ public Group appendGroup(Group group) { Group last = this; while (last.next != null) { @@ -38,14 +50,35 @@ public Group appendGroup(Group group) { return this; } + /** + * Get the resolved subject of a group by concatenating the group name and any groups. + * For example, this: + * + * Group g = new Group("A") + * .appendGroup(new Group("B")) + * .appendGroup(new Group("C")) + * .appendGroup(new Group("D")); + * System.out.println(g.getSubject()); + * + * prints "A.B.C.D" + * @return the subject + */ public String getSubject() { return next == null ? name : name + DOT + next.getSubject(); } + /** + * Get the name of the group. + * @return the name + */ public String getName() { return name; } + /** + * Get the next group after this group. May be null + * @return the next group + */ public Group getNext() { return next; } diff --git a/src/main/java/io/nats/service/InfoResponse.java b/src/main/java/io/nats/service/InfoResponse.java index 06eb4ee58..ac27a6c4d 100644 --- a/src/main/java/io/nats/service/InfoResponse.java +++ b/src/main/java/io/nats/service/InfoResponse.java @@ -26,7 +26,8 @@ import static io.nats.client.support.JsonValueUtils.*; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Info response class forms the info json payload, for example: + * {"id":"JlkwZvmHAXCQGwwxiPwaBJ","name":"MyService","version":"0.0.1","endpoints":[{"name":"MyEndpoint","subject":"myend"}],"type":"io.nats.micro.v1.info_response"} */ public class InfoResponse extends ServiceResponse { public static final String TYPE = "io.nats.micro.v1.info_response"; @@ -34,13 +35,13 @@ public class InfoResponse extends ServiceResponse { private final String description; private final List endpoints; - public InfoResponse(String id, String name, String version, Map metadata, String description, List endpoints) { + InfoResponse(String id, String name, String version, Map metadata, String description, List endpoints) { super(TYPE, id, name, version, metadata); this.description = description; this.endpoints = endpoints; } - public InfoResponse(byte[] jsonBytes) { + InfoResponse(byte[] jsonBytes) { this(parseMessage(jsonBytes)); } diff --git a/src/main/java/io/nats/service/PingResponse.java b/src/main/java/io/nats/service/PingResponse.java index 8fa16a666..70ce802b1 100644 --- a/src/main/java/io/nats/service/PingResponse.java +++ b/src/main/java/io/nats/service/PingResponse.java @@ -16,16 +16,17 @@ import java.util.Map; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Ping response class forms the ping json payload, for example: + * {"id":"JlkwZvmHAXCQGwwxiPwaBJ","name":"MyService","version":"0.0.1","type":"io.nats.micro.v1.ping_response"} */ public class PingResponse extends ServiceResponse { public static final String TYPE = "io.nats.micro.v1.ping_response"; - public PingResponse(String id, String name, String version, Map metadata) { + PingResponse(String id, String name, String version, Map metadata) { super(TYPE, id, name, version, metadata); } - public PingResponse(byte[] jsonBytes) { + PingResponse(byte[] jsonBytes) { super(TYPE, parseMessage(jsonBytes)); } } diff --git a/src/main/java/io/nats/service/Service.java b/src/main/java/io/nats/service/Service.java index 7ccf69a9d..b2a4e960c 100644 --- a/src/main/java/io/nats/service/Service.java +++ b/src/main/java/io/nats/service/Service.java @@ -32,7 +32,10 @@ import static io.nats.client.support.Validator.nullOrEmpty; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * The Services Framework introduces a higher-level API for implementing services with NATS. + * Services automatically contain Ping, Info and Stats responders. + * Services have one or more service endpoints. {@link ServiceEndpoint} + * When multiple instances of a service endpoints are active they work in a queue, meaning only one listener responds to any given request. */ public class Service { public static final String SRV_PING = "PING"; @@ -139,6 +142,10 @@ static String toDiscoverySubject(String discoveryName, String optionalServiceNam return DEFAULT_SERVICE_PREFIX + discoveryName + "." + optionalServiceNameSegment + "." + optionalServiceIdSegment; } + /** + * Start the service + * @return a future that can be held to see if another thread called stop + */ public CompletableFuture startService() { synchronized (startStopLock) { if (runningIndicator == null) { @@ -155,22 +162,42 @@ public CompletableFuture startService() { } } + /** + * Get an instance of a ServiceBuilder. + * @return the instance + */ public static ServiceBuilder builder() { return new ServiceBuilder(); } + /** + * Stop the service by draining. + */ public void stop() { stop(true, null); } + /** + * Stop the service by draining. Mark the future that was received from the start method that the service completed exceptionally. + * @param t the error cause + */ public void stop(Throwable t) { stop(true, t); } + /** + * Stop the service, optionally draining. + * @param drain the flag indicating to drain or not + */ public void stop(boolean drain) { stop(drain, null); } + /** + * Stop the service, optionally draining and optionally with an error cause + * @param drain the flag indicating to drain or not + * @param t the optional error cause. If supplied, mark the future that was received from the start method that the service completed exceptionally + */ public void stop(boolean drain, Throwable t) { synchronized (startStopLock) { if (runningIndicator != null) { @@ -231,6 +258,9 @@ public void stop(boolean drain, Throwable t) { } } + /** + * Reset the statistics for the endpoints + */ public void reset() { started = DateTimeUtils.gmtNow(); for (EndpointContext c : discoveryContexts) { @@ -241,43 +271,80 @@ public void reset() { } } + /** + * Get the id of the service + * @return the id + */ public String getId() { return infoResponse.getId(); } + /** + * Get the name of the service + * @return the name + */ public String getName() { return infoResponse.getName(); } + /** + * Get the version of the service + * @return the version + */ public String getVersion() { return infoResponse.getVersion(); } + /** + * Get the description of the service + * @return the description + */ public String getDescription() { return infoResponse.getDescription(); } + /** + * Get the drain timeout setting + * @return the drain timeout setting + */ public Duration getDrainTimeout() { return drainTimeout; } + /** + * Get the pre-constructed ping response. + * @return the ping response + */ public PingResponse getPingResponse() { return pingResponse; } + /** + * Get the pre-constructed info response. + * @return the info response + */ public InfoResponse getInfoResponse() { return infoResponse; } + /** + * Get the up-to-date stats response which contains a list of all {@link EndpointStats} + * @return the stats response + */ public StatsResponse getStatsResponse() { - List endpointStats = new ArrayList<>(); + List endpointStats = new ArrayList<>(); for (EndpointContext c : serviceContexts.values()) { endpointStats.add(c.getEndpointStats()); } return new StatsResponse(pingResponse, started, endpointStats); } - public EndpointResponse getEndpointStats(String endpointName) { + /** + * Get the up-to-date {@link EndpointStats} for a specific endpoint + * @param endpointName the endpoint name + * @return the EndpointStats or null if the name is not found. + */ + public EndpointStats getEndpointStats(String endpointName) { EndpointContext c = serviceContexts.get(endpointName); return c == null ? null : c.getEndpointStats(); } diff --git a/src/main/java/io/nats/service/ServiceBuilder.java b/src/main/java/io/nats/service/ServiceBuilder.java index e526c5184..59fd6e824 100644 --- a/src/main/java/io/nats/service/ServiceBuilder.java +++ b/src/main/java/io/nats/service/ServiceBuilder.java @@ -9,15 +9,19 @@ import static io.nats.client.support.Validator.*; +/** + * Build a Service using a fluent builder. + * Use the Service static method builder() or new ServiceBuilder() to get an instance. + */ public class ServiceBuilder { - public static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(5); + public static final long DEFAULT_DRAIN_TIMEOUT_MILLIS = 5000; + public static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofMillis(DEFAULT_DRAIN_TIMEOUT_MILLIS); Connection conn; String name; String description; String version; Map metadata; - String apiUrl; final Map serviceEndpoints = new HashMap<>(); Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT; Dispatcher pingDispatcher; @@ -25,61 +29,130 @@ public class ServiceBuilder { Dispatcher schemaDispatcher; Dispatcher statsDispatcher; + /** + * The connection the service runs on + * @param conn connection + * @return the ServiceBuilder + */ public ServiceBuilder connection(Connection conn) { this.conn = conn; return this; } + /** + * The simple name of the service + * @param name the name + * @return the ServiceBuilder + */ public ServiceBuilder name(String name) { this.name = validateIsRestrictedTerm(name, "Service Name", true); return this; } + /** + * The simple description of the service + * @param description the description + * @return the ServiceBuilder + */ public ServiceBuilder description(String description) { this.description = description; return this; } + /** + * The simple version of the service. + * @param version the version + * @return the ServiceBuilder + */ public ServiceBuilder version(String version) { this.version = validateSemVer(version, "Service Version", true); return this; } + /** + * Any meta information about this service + * @param metadata the meta + * @return the ServiceBuilder + */ public ServiceBuilder metadata(Map metadata) { this.metadata = metadata; return this; } - public ServiceBuilder addServiceEndpoint(ServiceEndpoint endpoint) { - serviceEndpoints.put(endpoint.getName(), endpoint); + /** + * Add a service endpoint into the service. There can only be one instance of a service endpoint by name + * @param serviceEndpoint the service endpoint + * @return the ServiceBuilder + */ + public ServiceBuilder addServiceEndpoint(ServiceEndpoint serviceEndpoint) { + serviceEndpoints.put(serviceEndpoint.getName(), serviceEndpoint); return this; } + /** + * The timeout when stopping a service. Defaults to {@value #DEFAULT_DRAIN_TIMEOUT_MILLIS} milliseconds + * @param drainTimeout the drain timeout + * @return the ServiceBuilder + */ public ServiceBuilder drainTimeout(Duration drainTimeout) { - this.drainTimeout = drainTimeout; + this.drainTimeout = drainTimeout == null ? DEFAULT_DRAIN_TIMEOUT : drainTimeout; return this; } + /** + * The timeout when stopping a service. Defaults to {@value #DEFAULT_DRAIN_TIMEOUT_MILLIS} milliseconds + * @param drainTimeoutMillis the drain timeout in milliseconds + * @return the ServiceBuilder + */ + public ServiceBuilder drainTimeout(long drainTimeoutMillis) { + this.drainTimeout = Duration.ofMillis(drainTimeoutMillis); + return this; + } + + /** + * Optional dispatcher for the ping service + * @param pingDispatcher the dispatcher + * @return the ServiceBuilder + */ public ServiceBuilder pingDispatcher(Dispatcher pingDispatcher) { this.pingDispatcher = pingDispatcher; return this; } + /** + * Optional dispatcher for the info service + * @param infoDispatcher the dispatcher + * @return the ServiceBuilder + */ public ServiceBuilder infoDispatcher(Dispatcher infoDispatcher) { this.infoDispatcher = infoDispatcher; return this; } + /** + * Optional dispatcher for the schema service + * @param schemaDispatcher the dispatcher + * @return the ServiceBuilder + */ public ServiceBuilder schemaDispatcher(Dispatcher schemaDispatcher) { this.schemaDispatcher = schemaDispatcher; return this; } + /** + * Optional dispatcher for the stats service + * @param statsDispatcher the dispatcher + * @return the ServiceBuilder + */ public ServiceBuilder statsDispatcher(Dispatcher statsDispatcher) { this.statsDispatcher = statsDispatcher; return this; } + /** + * Build the Service instance. + * @return the Service instance + */ public Service build() { required(conn, "Connection"); required(name, "Name"); diff --git a/src/main/java/io/nats/service/ServiceEndpoint.java b/src/main/java/io/nats/service/ServiceEndpoint.java index f1c81d2cc..d36385f4d 100644 --- a/src/main/java/io/nats/service/ServiceEndpoint.java +++ b/src/main/java/io/nats/service/ServiceEndpoint.java @@ -17,12 +17,22 @@ import io.nats.client.support.JsonValue; import io.nats.client.support.Validator; +import java.util.Map; import java.util.function.Supplier; import static io.nats.client.support.NatsConstants.DOT; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * The ServiceEndpoint represents the working {@link Endpoint} + *
    + *
  • It allows the endpoint to be grouped.
  • + *
  • It is where you can define the handler that will respond to incoming requests
  • + *
  • It allows you to define it's dispatcher if desired giving granularity to threads running subscribers
  • + *
  • It gives you a hook to provide custom data for the {@link EndpointStats}
  • + *
+ *

To create a ServiceEndpoint, use the ServiceEndpoint builder, which can be instantiated + * via the static method builder() or new ServiceEndpoint.Builder() to get an instance. + *

*/ public class ServiceEndpoint { private final Group group; @@ -48,80 +58,161 @@ private ServiceEndpoint(Builder b, Endpoint e) { this.statsDataSupplier = null; } + /** + * Get the name of the {@link Endpoint} + * @return the endpoint name + */ public String getName() { return endpoint.getName(); } + /** + * Get the subject of the ServiceEndpoint which takes into account the group path and the {@link Endpoint} subject + * @return the endpoint subject + */ public String getSubject() { return group == null ? endpoint.getSubject() : group.getSubject() + DOT + endpoint.getSubject(); } - public Group getGroup() { + /** + * Get the name of the {@link Group} + * @return the group name or null if there is no group + */ + public String getGroupName() { + return group == null ? null : group.getName(); + } + + /** + * Get a copy of the metadata of the {@link Endpoint} + * @return the copy of endpoint metadata + */ + public Map getMetadata() { + return endpoint.getMetadata(); + } + + protected Group getGroup() { return group; } - public Endpoint getEndpoint() { + protected Endpoint getEndpoint() { return endpoint; } - public ServiceMessageHandler getHandler() { + protected ServiceMessageHandler getHandler() { return handler; } - public Dispatcher getDispatcher() { + protected Dispatcher getDispatcher() { return dispatcher; } - public Supplier getStatsDataSupplier() { + protected Supplier getStatsDataSupplier() { return statsDataSupplier; } + /** + * Get an instance of a ServiceEndpoint Builder. + * @return the instance + */ public static Builder builder() { return new Builder(); } + /** + * Build an ServiceEndpoint using a fluent builder. + */ public static class Builder { private Group group; private ServiceMessageHandler handler; private Dispatcher dispatcher; private Supplier statsDataSupplier; - private final Endpoint.Builder endpointBuilder = Endpoint.builder(); + private Endpoint.Builder endpointBuilder = Endpoint.builder(); + /** + * Set the {@link Group} for this ServiceEndpoint + * @param group the group + * @return the ServiceEndpoint.Builder + */ public Builder group(Group group) { this.group = group; return this; } + /** + * Set the {@link Endpoint} for this ServiceEndpoint + * replacing all existing endpoint information. + * @param endpoint the endpoint to clone + * @return the ServiceEndpoint.Builder + */ public Builder endpoint(Endpoint endpoint) { - endpointBuilder.endpoint(endpoint); + endpointBuilder = Endpoint.builder().endpoint(endpoint); return this; } + /** + * Set the name for the {@link Endpoint} for this ServiceEndpoint replacing any name already set. + * @param name the endpoint name + * @return the ServiceEndpoint.Builder + */ public Builder endpointName(String name) { endpointBuilder.name(name); return this; } + /** + * Set the subject for the {@link Endpoint} for this ServiceEndpoint replacing any subject already set. + * @param subject the subject + * @return the ServiceEndpoint.Builder + */ public Builder endpointSubject(String subject) { endpointBuilder.subject(subject); return this; } + /** + * Set the metadata for the {@link Endpoint} for this ServiceEndpoint replacing any metadata already set. + * @param metadata the metadata + * @return the ServiceEndpoint.Builder + */ + public Builder endpointMetadata(Map metadata) { + endpointBuilder.metadata(metadata); + return this; + } + + /** + * Set the {@link ServiceMessageHandler} for this ServiceEndpoint + * @param handler the handler + * @return the ServiceEndpoint.Builder + */ public Builder handler(ServiceMessageHandler handler) { this.handler = handler; return this; } + /** + * Set the user {@link Dispatcher} for this ServiceEndpoint + * @param dispatcher the dispatcher + * @return the ServiceEndpoint.Builder + */ public Builder dispatcher(Dispatcher dispatcher) { this.dispatcher = dispatcher; return this; } + /** + * Set the {@link EndpointStats} data supplier for this ServiceEndpoint + * @param statsDataSupplier the data supplier + * @return the ServiceEndpoint.Builder + */ public Builder statsDataSupplier(Supplier statsDataSupplier) { this.statsDataSupplier = statsDataSupplier; return this; } + /** + * Build the ServiceEndpoint instance. + * @return the ServiceEndpoint instance + */ public ServiceEndpoint build() { Endpoint endpoint = endpointBuilder.build(); Validator.required(handler, "Message Handler"); diff --git a/src/main/java/io/nats/service/ServiceMessage.java b/src/main/java/io/nats/service/ServiceMessage.java index afa2f0846..3e8588927 100644 --- a/src/main/java/io/nats/service/ServiceMessage.java +++ b/src/main/java/io/nats/service/ServiceMessage.java @@ -15,24 +15,25 @@ import io.nats.client.Connection; import io.nats.client.Message; -import io.nats.client.Subscription; -import io.nats.client.impl.AckType; import io.nats.client.impl.Headers; -import io.nats.client.impl.NatsJetStreamMetaData; import io.nats.client.impl.NatsMessage; import io.nats.client.support.JsonSerializable; -import io.nats.client.support.Status; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.concurrent.TimeoutException; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Service Message is service specific object that exposes the service relevant parts of a NATS Message. */ -public class ServiceMessage implements Message { +public class ServiceMessage { + /** + * Standard header name used to report the text of an error + */ public static final String NATS_SERVICE_ERROR = "Nats-Service-Error"; + + /** + * Standard header name used to report the code of an error + */ public static final String NATS_SERVICE_ERROR_CODE = "Nats-Service-Error-Code"; private final Message message; @@ -41,146 +42,110 @@ public class ServiceMessage implements Message { this.message = message; } + /** + * Respond to a service request message. + * @param conn the NATS connection + * @param response the response payload in the form of a byte array + */ public void respond(Connection conn, byte[] response) { conn.publish(message.getReplyTo(), response); } + /** + * Respond to a service request message. + * @param conn the NATS connection + * @param response the response payload in the form of a string + */ public void respond(Connection conn, String response) { conn.publish(message.getReplyTo(), response.getBytes(StandardCharsets.UTF_8)); } + /** + * Respond to a service request message. + * @param conn the NATS connection + * @param response the response payload in the form of a {@link JsonSerializable} object + */ public void respond(Connection conn, JsonSerializable response) { conn.publish(message.getReplyTo(), response.serialize()); } + /** + * Respond to a service request message with a response and custom headers. + * @param conn the NATS connection + * @param response the response payload in the form of a byte array + * @param headers the custom headers + */ public void respond(Connection conn, byte[] response, Headers headers) { conn.publish(NatsMessage.builder().subject(message.getReplyTo()).data(response).headers(headers).build()); } + /** + * Respond to a service request message with a response and custom headers. + * @param conn the NATS connection + * @param response the response payload in the form of a string + * @param headers the custom headers + */ public void respond(Connection conn, String response, Headers headers) { conn.publish(NatsMessage.builder().subject(message.getReplyTo()).data(response).headers(headers).build()); } + /** + * Respond to a service request message. + * @param conn the NATS connection + * @param response the response payload in the form of a {@link JsonSerializable} object + * @param headers the custom headers + */ public void respond(Connection conn, JsonSerializable response, Headers headers) { conn.publish(NatsMessage.builder().subject(message.getReplyTo()).data(response.serialize()).headers(headers).build()); } - public void respondStandardError(Connection conn, String errorMessage, int errorCode) { + /** + * Respond to a service request message with a standard error. + * @param conn the NATS connection + * @param errorText the error message text + * @param errorCode the error message code + */ + public void respondStandardError(Connection conn, String errorText, int errorCode) { conn.publish(NatsMessage.builder() .subject(message.getReplyTo()) .headers(new Headers() - .put(NATS_SERVICE_ERROR, errorMessage) + .put(NATS_SERVICE_ERROR, errorText) .put(NATS_SERVICE_ERROR_CODE, "" + errorCode)) .build()); } - @Override + /** + * @return the subject that this message was sent to + */ public String getSubject() { return message.getSubject(); } - @Override + /** + * @return the subject the application is expected to send a reply message on + */ public String getReplyTo() { return message.getReplyTo(); } - @Override + /** + * @return true if there are headers + */ public boolean hasHeaders() { return message.hasHeaders(); } - @Override + /** + * @return the headers object for the message + */ public Headers getHeaders() { return message.getHeaders(); } - @Override - public boolean isStatusMessage() { - return message.isStatusMessage(); - } - - @Override - public Status getStatus() { - return message.getStatus(); - } - - @Override + /** + * @return the data from the message + */ public byte[] getData() { return message.getData(); } - - @Override - public boolean isUtf8mode() { - return message.isUtf8mode(); - } - - @Override - public Subscription getSubscription() { - return message.getSubscription(); - } - - @Override - public String getSID() { - return message.getSID(); - } - - @Override - public Connection getConnection() { - return message.getConnection(); - } - - @Override - public NatsJetStreamMetaData metaData() { - return message.metaData(); - } - - @Override - public AckType lastAck() { - return message.lastAck(); - } - - @Override - public void ack() { - message.ack(); - } - - @Override - public void ackSync(Duration timeout) throws TimeoutException, InterruptedException { - message.ackSync(timeout); - } - - @Override - public void nak() { - message.nak(); - } - - @Override - public void nakWithDelay(Duration nakDelay) { - message.nakWithDelay(nakDelay); - } - - @Override - public void nakWithDelay(long nakDelayMillis) { - message.nakWithDelay(nakDelayMillis); - } - - @Override - public void term() { - message.term(); - } - - @Override - public void inProgress() { - message.inProgress(); - } - - @Override - public boolean isJetStream() { - return message.isJetStream(); - } - - @Override - public long consumeByteCount() { - return message.consumeByteCount(); - } } diff --git a/src/main/java/io/nats/service/ServiceMessageHandler.java b/src/main/java/io/nats/service/ServiceMessageHandler.java index ffcaab921..2d02f932a 100644 --- a/src/main/java/io/nats/service/ServiceMessageHandler.java +++ b/src/main/java/io/nats/service/ServiceMessageHandler.java @@ -13,9 +13,12 @@ package io.nats.service; +/** + * Interface used to receive service request message. + */ public interface ServiceMessageHandler { /** - * Called to deliver a service message to the handler. + * Called to deliver a service request message to the handler. * @param smsg the service message */ void onMessage(ServiceMessage smsg); diff --git a/src/main/java/io/nats/service/ServiceResponse.java b/src/main/java/io/nats/service/ServiceResponse.java index fa36dd037..9e8bdba56 100644 --- a/src/main/java/io/nats/service/ServiceResponse.java +++ b/src/main/java/io/nats/service/ServiceResponse.java @@ -15,6 +15,7 @@ import io.nats.client.support.*; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -24,7 +25,7 @@ import static io.nats.client.support.JsonValueUtils.readStringStringMap; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Base class for service responses Info, Ping and Stats */ public abstract class ServiceResponse implements JsonSerializable { protected final String type; @@ -42,11 +43,7 @@ protected ServiceResponse(String type, String id, String name, String version, M } protected ServiceResponse(String type, ServiceResponse template) { - this.type = type; - this.id = template.id; - this.name = template.name; - this.version = template.version; - this.metadata = template.metadata; + this(type, template.id, template.name, template.version, template.metadata); } protected ServiceResponse(String type, JsonValue jv) { @@ -82,19 +79,19 @@ public String getType() { } /** - * The kind of the service reporting the status - * @return the service name + * The unique ID of the service + * @return the service id */ - public String getName() { - return name; + public String getId() { + return id; } /** - * The unique ID of the service reporting the status - * @return the service id + * The name of the service + * @return the service name */ - public String getId() { - return id; + public String getName() { + return name; } /** @@ -106,11 +103,11 @@ public String getVersion() { } /** - * Metadata for the service - * @return the metadata or null if there is no metadata + * A copy of the metadata for the service, or null if there is no metadata + * @return the metadata */ public Map getMetadata() { - return metadata; + return metadata == null ? null : new HashMap<>(metadata); } protected void subToJson(StringBuilder sb) {} diff --git a/src/main/java/io/nats/service/StatsResponse.java b/src/main/java/io/nats/service/StatsResponse.java index 1462f951f..2723d3a4a 100644 --- a/src/main/java/io/nats/service/StatsResponse.java +++ b/src/main/java/io/nats/service/StatsResponse.java @@ -26,42 +26,85 @@ import static io.nats.client.support.JsonValueUtils.readValue; /** - * SERVICE IS AN EXPERIMENTAL API SUBJECT TO CHANGE + * Stats response class forms the stats json payload, for example: + * + * { + * "id": "ZP1oVevzLGu4CBORMXKKke", + * "name": "Service1", + * "version": "0.0.1", + * "endpoints": [{ + * "name": "SortEndpointAscending", + * "subject": "sort.ascending", + * "num_requests": 1, + * "processing_time": 538900, + * "average_processing_time": 538900, + * "started": "2023-08-15T13:51:41.318000000Z" + * }, { + * "name": "SortEndpointDescending", + * "subject": "sort.descending", + * "num_requests": 1, + * "processing_time": 88400, + * "average_processing_time": 88400, + * "started": "2023-08-15T13:51:41.318000000Z" + * }, { + * "name": "EchoEndpoint", + * "subject": "echo", + * "num_requests": 5, + * "processing_time": 1931600, + * "average_processing_time": 386320, + * "data": { + * "idata": 2, + * "sdata": "s-996409223" + * }, + * "started": "2023-08-15T13:51:41.318000000Z" + * }], + * "started": "2023-08-15T13:51:41.319000000Z", + * "type": "io.nats.micro.v1.stats_response" + * } + * */ public class StatsResponse extends ServiceResponse { public static final String TYPE = "io.nats.micro.v1.stats_response"; private final ZonedDateTime started; - private final List endpointStats; + private final List endpointStatsList; - public StatsResponse(ServiceResponse template, ZonedDateTime started, List endpointStats) { + StatsResponse(ServiceResponse template, ZonedDateTime started, List endpointStatsList) { super(TYPE, template); this.started = started; - this.endpointStats = endpointStats; + this.endpointStatsList = endpointStatsList; } - public StatsResponse(byte[] jsonBytes) { + StatsResponse(byte[] jsonBytes) { this(parseMessage(jsonBytes)); } private StatsResponse(JsonValue jv) { super(TYPE, jv); - endpointStats = EndpointResponse.listOf(readValue(jv, ENDPOINTS)); + endpointStatsList = EndpointStats.listOf(readValue(jv, ENDPOINTS)); started = readDate(jv, STARTED); } @Override protected void subToJson(StringBuilder sb) { - JsonUtils.addJsons(sb, ENDPOINTS, endpointStats); + JsonUtils.addJsons(sb, ENDPOINTS, endpointStatsList); JsonUtils.addField(sb, STARTED, started); } + /** + * Get the time the service was started + * @return the start time + */ public ZonedDateTime getStarted() { return started; } - public List getEndpointStats() { - return endpointStats; + /** + * Get the list of {@link EndpointStats} + * @return the list of endpoint stats + */ + public List getEndpointStatsList() { + return endpointStatsList; } @Override @@ -73,14 +116,14 @@ public boolean equals(Object o) { StatsResponse that = (StatsResponse) o; if (!Objects.equals(started, that.started)) return false; - return Objects.equals(endpointStats, that.endpointStats); + return Objects.equals(endpointStatsList, that.endpointStatsList); } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + (started != null ? started.hashCode() : 0); - result = 31 * result + (endpointStats != null ? endpointStats.hashCode() : 0); + result = 31 * result + (endpointStatsList != null ? endpointStatsList.hashCode() : 0); return result; } } diff --git a/src/test/java/io/nats/client/impl/PingTests.java b/src/test/java/io/nats/client/impl/PingTests.java index 2164659c7..7e5eb7b5a 100644 --- a/src/test/java/io/nats/client/impl/PingTests.java +++ b/src/test/java/io/nats/client/impl/PingTests.java @@ -192,7 +192,7 @@ public void testPingTimerThroughReconnect() throws Exception { NatsStatistics stats = nc.getNatsStatistics(); try { - assertTrue(Connection.Status.CONNECTED == nc.getStatus(), "Connected Status"); + assertSame(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status"); try { Thread.sleep(200); // should get 10+ pings } catch (Exception exp) @@ -205,17 +205,12 @@ public void testPingTimerThroughReconnect() throws Exception { ts.close(); handler.waitForStatusChange(5, TimeUnit.SECONDS); pings = stats.getPings(); - try { - Thread.sleep(200); // should get more pings - } catch (Exception exp) - { - //Ignore - } + Thread.sleep(250); // should get more pings assertTrue(stats.getPings() > pings, "more pings"); Thread.sleep(1000); } finally { nc.close(); - assertTrue(Connection.Status.CLOSED == nc.getStatus(), "Closed Status"); + assertSame(Connection.Status.CLOSED, nc.getStatus(), "Closed Status"); } } } diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index 1d4e198c4..6d63ada4a 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -109,7 +109,7 @@ private void _testReconnect(Function ntsSupplier, BiCon handler.prepForStatusChange(Events.RESUBSCRIBED); try (NatsTestServer ts = ntsSupplier.apply(port)) { - standardConnectionWait(nc, handler); + standardConnectionWait(nc, handler, LONG_CONNECTION_WAIT_MS); end = System.nanoTime(); diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index 60c614481..e9ce5da53 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -338,6 +338,9 @@ public void testConsumeWithHandler() throws Exception { try (MessageConsumer consumer = consumerContext.consume(handler)) { latch.await(); consumer.stop(); + while (!consumer.isFinished()) { + Thread.sleep(10); + } assertTrue(atomicCount.get() > 500); } }); diff --git a/src/test/java/io/nats/service/ServiceTests.java b/src/test/java/io/nats/service/ServiceTests.java index 006fac662..cd37dca90 100644 --- a/src/test/java/io/nats/service/ServiceTests.java +++ b/src/test/java/io/nats/service/ServiceTests.java @@ -159,12 +159,12 @@ public void testServiceWorkflow() throws Exception { InfoResponse infoResponse2 = service2.getInfoResponse(); StatsResponse statsResponse1 = service1.getStatsResponse(); StatsResponse statsResponse2 = service2.getStatsResponse(); - EndpointResponse[] endpointResponseArray1 = new EndpointResponse[]{ + EndpointStats[] endpointStatsArray1 = new EndpointStats[]{ service1.getEndpointStats(ECHO_ENDPOINT_NAME), service1.getEndpointStats(SORT_ENDPOINT_ASCENDING_NAME), service1.getEndpointStats(SORT_ENDPOINT_DESCENDING_NAME) }; - EndpointResponse[] endpointResponseArray2 = new EndpointResponse[]{ + EndpointStats[] endpointStatsArray2 = new EndpointStats[]{ service2.getEndpointStats(ECHO_ENDPOINT_NAME), service2.getEndpointStats(SORT_ENDPOINT_ASCENDING_NAME), service2.getEndpointStats(SORT_ENDPOINT_DESCENDING_NAME) @@ -183,11 +183,11 @@ public void testServiceWorkflow() throws Exception { // expecting 10 responses across each endpoint between 2 services for (int x = 0; x < 3; x++) { assertEquals(requestCount, - endpointResponseArray1[x].getNumRequests() - + endpointResponseArray2[x].getNumRequests()); + endpointStatsArray1[x].getNumRequests() + + endpointStatsArray2[x].getNumRequests()); assertEquals(requestCount, - statsResponse1.getEndpointStats().get(x).getNumRequests() - + statsResponse2.getEndpointStats().get(x).getNumRequests()); + statsResponse1.getEndpointStatsList().get(x).getNumRequests() + + statsResponse2.getEndpointStatsList().get(x).getNumRequests()); } // discovery - wait at most 500 millis for responses, 5 total responses max @@ -224,10 +224,10 @@ public void testServiceWorkflow() throws Exception { StatsResponse sr = (StatsResponse) response; assertEquals(exp.getStarted(), sr.getStarted()); for (int x = 0; x < 3; x++) { - EndpointResponse er = exp.getEndpointStats().get(x); + EndpointStats er = exp.getEndpointStatsList().get(x); if (!er.getName().equals(ECHO_ENDPOINT_NAME)) { // echo endpoint has data that will vary - assertEquals(er, sr.getEndpointStats().get(x)); + assertEquals(er, sr.getEndpointStatsList().get(x)); } } }; @@ -246,7 +246,7 @@ public void testServiceWorkflow() throws Exception { StatsResponse sr = service1.getStatsResponse(); assertTrue(zdt.isBefore(sr.getStarted())); for (int x = 0; x < 3; x++) { - EndpointResponse er = sr.getEndpointStats().get(x); + EndpointStats er = sr.getEndpointStatsList().get(x); assertEquals(0, er.getNumRequests()); assertEquals(0, er.getNumErrors()); assertEquals(0, er.getProcessingTime()); @@ -592,7 +592,7 @@ public void testHandlerException() throws Exception { assertEquals("java.lang.RuntimeException: handler-problem", m.getHeaders().getFirst(NATS_SERVICE_ERROR)); assertEquals("500", m.getHeaders().getFirst(NATS_SERVICE_ERROR_CODE)); StatsResponse sr = exService.getStatsResponse(); - EndpointResponse er = sr.getEndpointStats().get(0); + EndpointStats er = sr.getEndpointStatsList().get(0); assertEquals(1, er.getNumRequests()); assertEquals(1, er.getNumErrors()); assertEquals("java.lang.RuntimeException: handler-problem", er.getLastError()); @@ -631,22 +631,6 @@ public void testServiceMessage() throws Exception { assertEquals("testServiceMessage", m.getSubject()); assertFalse(m.hasHeaders()); assertNull(m.getHeaders()); - assertFalse(m.isStatusMessage()); - assertNull(m.getStatus()); - m.isUtf8mode(); - assertNotNull(m.getSubscription()); - assertNotNull(m.getSID()); - assertNotNull(m.getConnection()); - assertThrows(IllegalStateException.class, m::metaData); - assertNull(m.lastAck()); - assertDoesNotThrow(() -> m.ackSync(Duration.ofMillis(1))); - m.ack(); - m.nak(); - m.nakWithDelay(Duration.ofMillis(1)); - m.nakWithDelay(1); - m.term(); - m.inProgress(); - assertFalse(m.isJetStream()); // the actual reply m.respondStandardError(nc, "error", 500); break; @@ -753,6 +737,11 @@ public void testEndpointConstruction() { e = new Endpoint(NAME, "foo.*"); assertEquals("foo.*", e.getSubject()); + // coverage + e = new Endpoint(NAME, SUBJECT, metadata); + assertEquals(NAME, e.getName()); + assertEquals(SUBJECT, e.getSubject()); + assertTrue(JsonUtils.mapEquals(metadata, e.getMetadata())); assertThrows(IllegalArgumentException.class, () -> Endpoint.builder().build()); // many names are bad @@ -781,12 +770,12 @@ public void testEndpointConstruction() { @Test public void testEndpointResponseConstruction() { JsonValue data = new JsonValue("data"); - EqualsVerifier.simple().forClass(EndpointResponse.class) + EqualsVerifier.simple().forClass(EndpointStats.class) .withPrefabValues(JsonValue.class, data, JsonValue.NULL) .verify(); ZonedDateTime zdt = DateTimeUtils.gmtNow(); - EndpointResponse er = new EndpointResponse("name", "subject", 0, 0, 0, null, null, zdt); + EndpointStats er = new EndpointStats("name", "subject", 0, 0, 0, null, null, zdt); assertEquals("name", er.getName()); assertEquals("subject", er.getSubject()); assertNull(er.getLastError()); @@ -797,7 +786,7 @@ public void testEndpointResponseConstruction() { assertEquals(0, er.getAverageProcessingTime()); assertEquals(zdt, er.getStarted()); - er = new EndpointResponse("name", "subject", 2, 4, 10, "lastError", data, zdt); + er = new EndpointStats("name", "subject", 2, 4, 10, "lastError", data, zdt); assertEquals("name", er.getName()); assertEquals("subject", er.getSubject()); assertEquals("lastError", er.getLastError()); @@ -818,7 +807,7 @@ public void testEndpointResponseConstruction() { assertTrue(j.contains("\"num_errors\":4")); assertTrue(j.contains("\"processing_time\":10")); assertTrue(j.contains("\"average_processing_time\":5")); - assertEquals(toKey(EndpointResponse.class) + j, er.toString()); + assertEquals(toKey(EndpointStats.class) + j, er.toString()); } @Test @@ -885,6 +874,7 @@ public void testServiceEndpointConstruction() { assertEquals(smh, se.getHandler()); assertEquals(sds, se.getStatsDataSupplier()); assertNull(se.getDispatcher()); + assertNull(se.getGroupName()); se = ServiceEndpoint.builder() .group(g1) @@ -894,6 +884,7 @@ public void testServiceEndpointConstruction() { assertEquals(g1, se.getGroup()); assertEquals(e1, se.getEndpoint()); assertEquals(e1.getName(), se.getName()); + assertEquals(se.getGroup().getName(), se.getGroupName()); assertEquals(g1.getSubject() + DOT + e1.getSubject(), se.getSubject()); assertNull(se.getStatsDataSupplier()); assertNull(se.getDispatcher()); @@ -910,7 +901,7 @@ public void testServiceEndpointConstruction() { se = ServiceEndpoint.builder() .endpoint(e1) - .endpoint(e2) + .endpoint(e2) // last one wins .handler(smh) .build(); assertEquals(e2, se.getEndpoint()); @@ -933,6 +924,22 @@ public void testServiceEndpointConstruction() { assertEquals(e1.getName(), se.getName()); assertEquals(e2.getSubject(), se.getSubject()); + Map metadata = new HashMap<>(); + se = ServiceEndpoint.builder() + .endpoint(e1) + .endpointMetadata(metadata) + .handler(smh) + .build(); + assertNull(se.getMetadata()); + + metadata.put("k", "v"); + se = ServiceEndpoint.builder() + .endpoint(e1) + .endpointMetadata(metadata) + .handler(smh) + .build(); + assertTrue(JsonUtils.mapEquals(metadata, se.getMetadata())); + IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> ServiceEndpoint.builder().build()); assertTrue(iae.getMessage().contains("Endpoint")); @@ -1008,10 +1015,10 @@ public void testServiceResponsesConstruction() { sleep(100); endStarteds[1] = DateTimeUtils.gmtNow(); - List statsList = new ArrayList<>(); + List statsList = new ArrayList<>(); JsonValue[] data = new JsonValue[]{supplyData(), supplyData()}; - statsList.add(new EndpointResponse("endName0", "endSubject0", 1000, 0, 10000, "lastError0", data[0], endStarteds[0])); - statsList.add(new EndpointResponse("endName1", "endSubject1", 2000, 10, 10000, "lastError1", data[1], endStarteds[1])); + statsList.add(new EndpointStats("endName0", "endSubject0", 1000, 0, 10000, "lastError0", data[0], endStarteds[0])); + statsList.add(new EndpointStats("endName1", "endSubject1", 2000, 10, 10000, "lastError1", data[1], endStarteds[1])); StatsResponse stat1 = new StatsResponse(pr1, serviceStarted, statsList); StatsResponse stat2 = new StatsResponse(stat1.toJson().getBytes()); @@ -1021,16 +1028,16 @@ public void testServiceResponsesConstruction() { EqualsVerifier.simple().forClass(PingResponse.class).verify(); EqualsVerifier.simple().forClass(InfoResponse.class).verify(); EqualsVerifier.simple().forClass(StatsResponse.class) - .withPrefabValues(EndpointResponse.class, statsList.get(0), statsList.get(1)) + .withPrefabValues(EndpointStats.class, statsList.get(0), statsList.get(1)) .verify(); } private static void validateApiInOutStatsResponse(StatsResponse stat, ZonedDateTime serviceStarted, ZonedDateTime[] endStarteds, JsonValue[] data) { validateApiInOutServiceResponse(stat, StatsResponse.TYPE); assertEquals(serviceStarted, stat.getStarted()); - assertEquals(2, stat.getEndpointStats().size()); + assertEquals(2, stat.getEndpointStatsList().size()); for (int x = 0; x < 2; x++) { - EndpointResponse e = stat.getEndpointStats().get(x); + EndpointStats e = stat.getEndpointStatsList().get(x); assertEquals("endName" + x, e.getName()); assertEquals("endSubject" + x, e.getSubject()); long nr = x * 1000 + 1000;