diff --git a/build.gradle b/build.gradle index 363053111..f6ae05556 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ repositories { dependencies { implementation 'net.i2p.crypto:eddsa:0.3.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' - testImplementation 'io.nats:jnats-server-runner:1.0.9' + testImplementation 'io.nats:jnats-server-runner:1.0.14' } sourceSets { diff --git a/dependencies.md b/dependencies.md index cb18ddb00..df3e6dce8 100644 --- a/dependencies.md +++ b/dependencies.md @@ -12,7 +12,7 @@ This file lists the dependencies used in this repository. | Dependency | License | |-------------------------------------------------|-----------------------------------------| -| io.nats:jnats-server-runner:1.0.9 | Apache 2.0 License | +| io.nats:jnats-server-runner:1.0.14 | Apache 2.0 License | | org.apiguardian:apiguardian-api:1.1.0 | Apache 2.0 License | | org.junit.jupiter:junit-jupiter:5.9.0 | Eclipse Public License v2.0 | | org.junit:junit-bom:5.9.0 | Eclipse Public License v2.0 | diff --git a/src/main/java/io/nats/client/impl/IncomingMessage.java b/src/main/java/io/nats/client/impl/IncomingMessage.java new file mode 100644 index 000000000..a108f0af2 --- /dev/null +++ b/src/main/java/io/nats/client/impl/IncomingMessage.java @@ -0,0 +1,32 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +public class IncomingMessage extends NatsMessage { + IncomingMessage() {} + + IncomingMessage(byte[] data) { + super(data); + } + + @Override + byte[] getProtocolBytes() { + throw new IllegalStateException("getProtocolBytes not supported for this type of message."); + } + + @Override + int getControlLineLength() { + throw new IllegalStateException("getControlLineLength not supported for this type of message."); + } +} diff --git a/src/main/java/io/nats/client/impl/IncomingMessageFactory.java b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java new file mode 100644 index 000000000..58146aa03 --- /dev/null +++ b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java @@ -0,0 +1,76 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.IncomingHeadersProcessor; +import io.nats.client.support.Status; + +import static io.nats.client.support.NatsJetStreamConstants.JS_ACK_SUBJECT_PREFIX; + +// ---------------------------------------------------------------------------------------------------- +// Incoming Message Factory - internal use only +// ---------------------------------------------------------------------------------------------------- +class IncomingMessageFactory { + private final String sid; + private final String subject; + private final String replyTo; + private final int protocolLineLength; + private final boolean utf8mode; + + private byte[] data; + private Headers headers; + private Status status; + private int headerLen; + + // Create an incoming message for a subscriber + // Doesn't check control line size, since the server sent us the message + IncomingMessageFactory(String sid, String subject, String replyTo, int protocolLength, boolean utf8mode) { + this.sid = sid; + this.subject = subject; + this.replyTo = replyTo; + this.protocolLineLength = protocolLength; + this.utf8mode = utf8mode; + } + + void setHeaders(IncomingHeadersProcessor ihp) { + headers = ihp.getHeaders(); + status = ihp.getStatus(); + headerLen = ihp.getSerializedLength(); + } + + void setData(byte[] data) { + this.data = data; + } + + NatsMessage getMessage() { + NatsMessage message; + if (status != null) { + message = new StatusMessage(status); + } + else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) { + message = new NatsJetStreamMessage(data); + } + else { + message = new IncomingMessage(data); + } + message.sid = sid; + message.subject = subject; + message.replyTo = replyTo; + message.headers = headers; + message.headerLen = headerLen; + message.utf8mode = utf8mode; + message.sizeInBytes = protocolLineLength + headerLen + message.dataLen + 4; // Two CRLFs + return message; + } +} diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 03195a427..96d120b34 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -16,7 +16,6 @@ import io.nats.client.*; import io.nats.client.ConnectionListener.Events; import io.nats.client.api.ServerInfo; -import io.nats.client.impl.NatsMessage.ProtocolMessage; import io.nats.client.support.ByteArrayBuilder; import io.nats.client.support.NatsRequestCompletableFuture; import io.nats.client.support.Validator; @@ -1286,9 +1285,9 @@ CompletableFuture sendPing(boolean treatAsInternal) { pongQueue.add(pongFuture); if (treatAsInternal) { - queueInternalOutgoing(new ProtocolMessage(OP_PING_BYTES)); + queueInternalOutgoing(new ProtocolMessage(PING_PROTO)); } else { - queueOutgoing(new ProtocolMessage(OP_PING_BYTES)); + queueOutgoing(new ProtocolMessage(PING_PROTO)); } this.needPing.set(true); @@ -1296,8 +1295,17 @@ CompletableFuture sendPing(boolean treatAsInternal) { return pongFuture; } + // This is a minor speed / memory enhancement. + // We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state + // But it is safe to share the data bytes and the size since those fields are just being read + // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size + // reducing allocation of data for something that is often created and used + // These static instances are the once that are used for copying, sendPing and sendPong + private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES); + private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES); + void sendPong() { - queueInternalOutgoing(new ProtocolMessage(OP_PONG_BYTES)); + queueInternalOutgoing(new ProtocolMessage(PONG_PROTO)); } // Called by the reader diff --git a/src/main/java/io/nats/client/impl/NatsConnectionReader.java b/src/main/java/io/nats/client/impl/NatsConnectionReader.java index 971a680df..d3d151343 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionReader.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionReader.java @@ -13,7 +13,6 @@ package io.nats.client.impl; -import io.nats.client.impl.NatsMessage.InternalMessageFactory; import io.nats.client.support.IncomingHeadersProcessor; import java.io.IOException; @@ -46,21 +45,21 @@ enum Mode { private boolean gotCR; private String op; - private char[] opArray; + private final char[] opArray; private int opPos; - private char[] msgLineChars; + private final char[] msgLineChars; private int msgLinePosition; private Mode mode; - private InternalMessageFactory incoming; + private IncomingMessageFactory incoming; private byte[] msgHeaders; private byte[] msgData; private int msgHeadersPosition; private int msgDataPosition; - private byte[] buffer; + private final byte[] buffer; private int bufferPosition; private Future stopped; @@ -418,7 +417,7 @@ static String opFor(char[] chars, int length) { } } - private static int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000}; + private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000}; public static int parseLength(String s) throws NumberFormatException { int length = s.length(); @@ -433,7 +432,7 @@ public static int parseLength(String s) throws NumberFormatException { int d = (c - '0'); if (d>9) { - throw new NumberFormatException("Invalid char in message length \'" + c + "\'"); + throw new NumberFormatException("Invalid char in message length '" + c + "'"); } retVal += d * TENS[length - i - 1]; @@ -476,7 +475,7 @@ void parseProtocolMessage() throws IOException { int incomingLength = parseLength(lengthChars); - this.incoming = new InternalMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode); + this.incoming = new IncomingMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode); this.mode = Mode.GATHER_DATA; this.msgData = new byte[incomingLength]; this.msgDataPosition = 0; @@ -518,7 +517,7 @@ void parseProtocolMessage() throws IOException { throw new IllegalStateException("Bad HMSG control line, missing required fields"); } - this.incoming = new InternalMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode); + this.incoming = new IncomingMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode); this.msgHeaders = new byte[hdrLen]; this.msgData = new byte[totLen - hdrLen]; this.mode = Mode.GATHER_HEADERS; @@ -532,7 +531,7 @@ void parseProtocolMessage() throws IOException { this.mode = Mode.GATHER_OP; break; case OP_ERR: - String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("\'", ""); + String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", ""); this.connection.processError(errorText); this.op = UNKNOWN_OP; this.mode = Mode.GATHER_OP; diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java b/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java index 54e9aac71..a57a312fa 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java @@ -14,7 +14,6 @@ package io.nats.client.impl; import io.nats.client.Connection; -import io.nats.client.impl.NatsMessage.InternalMessage; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -23,11 +22,13 @@ import static io.nats.client.support.NatsConstants.NANOS_PER_MILLI; import static io.nats.client.support.Validator.validateDurationRequired; -class NatsJetStreamMessage extends InternalMessage { +class NatsJetStreamMessage extends IncomingMessage { private NatsJetStreamMetaData jsMetaData = null; - NatsJetStreamMessage() {} + NatsJetStreamMessage(byte[] data) { + super(data); + } /** * {@inheritDoc} diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java index 71a35f348..dc9c69a62 100644 --- a/src/main/java/io/nats/client/impl/NatsMessage.java +++ b/src/main/java/io/nats/client/impl/NatsMessage.java @@ -17,7 +17,6 @@ import io.nats.client.Message; import io.nats.client.Subscription; import io.nats.client.support.ByteArrayBuilder; -import io.nats.client.support.IncomingHeadersProcessor; import io.nats.client.support.Status; import java.nio.charset.Charset; @@ -26,7 +25,6 @@ import java.util.concurrent.TimeoutException; import static io.nats.client.support.NatsConstants.*; -import static io.nats.client.support.NatsJetStreamConstants.JS_ACK_SUBJECT_PREFIX; import static io.nats.client.support.Validator.validateReplyTo; import static io.nats.client.support.Validator.validateSubject; import static java.nio.charset.StandardCharsets.US_ASCII; @@ -44,18 +42,15 @@ public class NatsMessage implements Message { // incoming specific : subject, replyTo, data and these fields protected String sid; - protected int protocolLineLength; + protected int controlLineLength; // protocol specific : just this field protected ByteArrayBuilder protocolBab; // housekeeping protected int sizeInBytes = -1; - protected int hdrLen = 0; - protected int dataLen = 0; - protected int totLen = 0; - - protected boolean dirty = false; + protected int headerLen = 0; + protected int dataLen; protected NatsSubscription subscription; @@ -66,119 +61,97 @@ public class NatsMessage implements Message { // ---------------------------------------------------------------------------------------------------- // Constructors - Prefer to use Builder // ---------------------------------------------------------------------------------------------------- - private NatsMessage() { - this.data = EMPTY_BODY; + protected NatsMessage() { + this((byte[])null); } - private NatsMessage(byte[] data) { + protected NatsMessage(byte[] data) { this.data = data == null ? EMPTY_BODY : data; + dataLen = this.data.length; } @Deprecated // Plans are to remove allowing utf8-mode public NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) { - this(subject, replyTo, null, data, utf8mode); - } - - public NatsMessage(String subject, String replyTo, byte[] data) { - this(subject, replyTo, null, data, false); - } - - public NatsMessage(Message message) { - this(message.getSubject(), - message.getReplyTo(), - message.getHeaders(), - message.getData(), - message.isUtf8mode()); + this(subject, replyTo, null, data); + this.utf8mode = utf8mode; } - @Deprecated // Plans are to remove allowing utf8-mode public NatsMessage(String subject, String replyTo, Headers headers, byte[] data, boolean utf8mode) { this(subject, replyTo, headers, data); this.utf8mode = utf8mode; } + public NatsMessage(String subject, String replyTo, byte[] data) { + this(subject, replyTo, null, data); + } + public NatsMessage(String subject, String replyTo, Headers headers, byte[] data) { this(data); this.subject = validateSubject(subject, true); this.replyTo = validateReplyTo(replyTo, false); this.headers = headers; this.utf8mode = false; + finishConstruct(); + } - dirty = true; + public NatsMessage(Message message) { + this(message.getData()); + this.subject = message.getSubject(); + this.replyTo = message.getReplyTo(); + this.headers = message.getHeaders(); + this.utf8mode = message.isUtf8mode(); + finishConstruct(); } - // ---------------------------------------------------------------------------------------------------- - // Only for implementors. The user created message is the only current one that calculates. - // ---------------------------------------------------------------------------------------------------- - protected boolean calculateIfDirty() { - if (dirty || (hasHeaders() && headers.isDirty())) { - int replyToLen = replyTo == null ? 0 : replyTo.length(); - dataLen = data.length; + protected void finishConstruct() { + int replyToLen = replyTo == null ? 0 : replyTo.length(); - if (headers != null && !headers.isEmpty()) { - hdrLen = headers.serializedLength(); - } - else { - hdrLen = 0; - } - totLen = hdrLen + dataLen; + if (headers != null && !headers.isEmpty()) { + headerLen = headers.serializedLength(); + } + else { + headerLen = 0; + } + int headerAndDataLen = headerLen + dataLen; - // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases - // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (hdrLen + dataLen) - ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + totLen); + // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases + // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (headerLen + dataLen) + ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + headerAndDataLen); - // protocol come first - if (hdrLen > 0) { - bab.append(HPUB_SP_BYTES, 0, HPUB_SP_BYTES_LEN); - } - else { - bab.append(PUB_SP_BYTES, 0, PUB_SP_BYTES_LEN); - } + // protocol come first + if (headerLen > 0) { + bab.append(HPUB_SP_BYTES, 0, HPUB_SP_BYTES_LEN); + } + else { + bab.append(PUB_SP_BYTES, 0, PUB_SP_BYTES_LEN); + } - // next comes the subject - bab.append(subject.getBytes(UTF_8)).append(SP); + // next comes the subject + bab.append(subject.getBytes(UTF_8)).append(SP); - // reply to if it's there - if (replyToLen > 0) { - bab.append(replyTo.getBytes(UTF_8)).append(SP); - } + // reply to if it's there + if (replyToLen > 0) { + bab.append(replyTo.getBytes(UTF_8)).append(SP); + } - // header length if there are headers - if (hdrLen > 0) { - bab.append(Integer.toString(hdrLen).getBytes(US_ASCII)).append(SP); - } + // header length if there are headers + if (headerLen > 0) { + bab.append(Integer.toString(headerLen).getBytes(US_ASCII)).append(SP); + } - // payload length - bab.append(Integer.toString(totLen).getBytes(US_ASCII)); + // payload length + bab.append(Integer.toString(headerAndDataLen).getBytes(US_ASCII)); - protocolBab = bab; - dirty = false; - return true; - } - return false; + protocolBab = bab; + controlLineLength = protocolBab.length() + 2; // One CRLF. This is just how controlLineLength is defined. + sizeInBytes = controlLineLength + headerAndDataLen + 2; // The 2nd CRLFs } // ---------------------------------------------------------------------------------------------------- // Client and Message Internal Methods // ---------------------------------------------------------------------------------------------------- long getSizeInBytes() { - if (calculateIfDirty() || sizeInBytes == -1) { - sizeInBytes = protocolLineLength; - if (protocolBab != null) { - sizeInBytes += protocolBab.length(); - } - sizeInBytes += 2; // CRLF - if (!isProtocol()) { - if (hdrLen > 0) { - sizeInBytes += hdrLen; - } - if (dataLen > 0) { - sizeInBytes += dataLen; - } - sizeInBytes += 2; // CRLF - } - } return sizeInBytes; } @@ -187,18 +160,11 @@ boolean isProtocol() { } byte[] getProtocolBytes() { - calculateIfDirty(); return protocolBab.toByteArray(); } - ByteArrayBuilder getProtocolBab() { - calculateIfDirty(); - return protocolBab; - } - int getControlLineLength() { - calculateIfDirty(); - return (protocolBab != null) ? protocolBab.length() + 2 : -1; + return controlLineLength; } Headers getOrCreateHeaders() { @@ -405,7 +371,6 @@ public String toString() { } String toDetailString() { - calculateIfDirty(); return "NatsMessage:" + "\n subject='" + subject + '\'' + "\n replyTo='" + replyToString() + '\'' + @@ -413,12 +378,10 @@ String toDetailString() { "\n utf8mode=" + utf8mode + "\n headers=" + headersToString() + "\n sid='" + sid + '\'' + - "\n protocolLineLength=" + protocolLineLength + "\n protocolBytes=" + protocolBytesToString() + "\n sizeInBytes=" + sizeInBytes + - "\n hdrLen=" + hdrLen + + "\n headerLen=" + headerLen + "\n dataLen=" + dataLen + - "\n totLen=" + totLen + "\n subscription=" + subscription + "\n next=" + nextToString(); @@ -563,135 +526,4 @@ public NatsMessage build() { return new NatsMessage(subject, replyTo, headers, data, utf8mode); } } - - // ---------------------------------------------------------------------------------------------------- - // Incoming Message Factory - internal use only - // ---------------------------------------------------------------------------------------------------- - static class InternalMessageFactory { - private final String sid; - private final String subject; - private final String replyTo; - private final int protocolLineLength; - private final boolean utf8mode; - - private byte[] data; - private Headers headers; - private Status status; - private int hdrLen = 0; - private int dataLen = 0; - private int totLen = 0; - - // Create an incoming message for a subscriber - // Doesn't check control line size, since the server sent us the message - InternalMessageFactory(String sid, String subject, String replyTo, int protocolLength, boolean utf8mode) { - this.sid = sid; - this.subject = subject; - this.replyTo = replyTo; - this.protocolLineLength = protocolLength; - this.utf8mode = utf8mode; - // headers and data are set later and sizes are calculated during those setters - } - - void setHeaders(IncomingHeadersProcessor ihp) { - headers = ihp.getHeaders(); - status = ihp.getStatus(); - hdrLen = ihp.getSerializedLength(); - totLen = hdrLen + dataLen; - } - - void setData(byte[] data) { - this.data = data; - dataLen = data == null ? 0 : data.length; - totLen = hdrLen + dataLen; - } - - NatsMessage getMessage() { - NatsMessage message = null; - if (status != null) { - message = new StatusMessage(status); - } - else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) { - message = new NatsJetStreamMessage(); - } - if (message == null) { - message = new InternalMessage(); - } - message.sid = this.sid; - message.subject = this.subject; - message.replyTo = this.replyTo; - message.protocolLineLength = this.protocolLineLength; - message.headers = this.headers; - message.data = this.data == null ? EMPTY_BODY : this.data; - message.utf8mode = this.utf8mode; - message.hdrLen = this.hdrLen; - message.dataLen = this.dataLen; - message.totLen = this.totLen; - - return message; - } - } - - static class InternalMessage extends NatsMessage { - @Override - protected boolean calculateIfDirty() { - return false; - } - } - - private static final ByteArrayBuilder EMPTY_BAB = new ByteArrayBuilder(); - - static class ProtocolMessage extends InternalMessage { - ProtocolMessage(byte[] protocol) { - this.protocolBab = protocol == null ? EMPTY_BAB : new ByteArrayBuilder(protocol); - } - - ProtocolMessage(ByteArrayBuilder babProtocol) { - protocolBab = babProtocol; - } - - ProtocolMessage(String asciiProtocol) { - protocolBab = new ByteArrayBuilder().append(asciiProtocol); - } - - @Override - byte[] getProtocolBytes() { - return protocolBab.toByteArray(); - } - - @Override - ByteArrayBuilder getProtocolBab() { - return protocolBab; - } - - @Override - boolean isProtocol() { - return true; - } - } - - static class StatusMessage extends InternalMessage { - private final Status status; - - public StatusMessage(Status status) { - this.status = status; - } - - @Override - public boolean isStatusMessage() { - return true; - } - - @Override - public Status getStatus() { - return status; - } - - @Override - public String toString() { - return "StatusMessage{" + - "code=" + status.getCode() + - ", message='" + status.getMessage() + '\'' + - '}'; - } - } } diff --git a/src/main/java/io/nats/client/impl/ProtocolMessage.java b/src/main/java/io/nats/client/impl/ProtocolMessage.java new file mode 100644 index 000000000..5bad98a5d --- /dev/null +++ b/src/main/java/io/nats/client/impl/ProtocolMessage.java @@ -0,0 +1,42 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.ByteArrayBuilder; + +// ---------------------------------------------------------------------------------------------------- +// Protocol message is a special version of a NatsMessage +// ---------------------------------------------------------------------------------------------------- +class ProtocolMessage extends NatsMessage { + private static final ByteArrayBuilder EMPTY_BAB = new ByteArrayBuilder(); + + ProtocolMessage(ByteArrayBuilder babProtocol) { + protocolBab = babProtocol; + sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data + } + + ProtocolMessage(byte[] protocol) { + this(protocol == null ? EMPTY_BAB : new ByteArrayBuilder(protocol)); + } + + @Override + boolean isProtocol() { + return true; + } + + ProtocolMessage(ProtocolMessage pm) { + protocolBab = pm.protocolBab; + sizeInBytes = pm.sizeInBytes; + } +} diff --git a/src/main/java/io/nats/client/impl/StatusMessage.java b/src/main/java/io/nats/client/impl/StatusMessage.java new file mode 100644 index 000000000..5cd313ef9 --- /dev/null +++ b/src/main/java/io/nats/client/impl/StatusMessage.java @@ -0,0 +1,42 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.Status; + +public class StatusMessage extends IncomingMessage { + private final Status status; + + StatusMessage(Status status) { + this.status = status; + } + + @Override + public boolean isStatusMessage() { + return true; + } + + @Override + public Status getStatus() { + return status; + } + + @Override + public String toString() { + return "StatusMessage{" + + "code=" + status.getCode() + + ", message='" + status.getMessage() + '\'' + + '}'; + } +} diff --git a/src/test/java/io/nats/client/NatsTestServer.java b/src/test/java/io/nats/client/NatsTestServer.java index f5233bc5f..e151846ac 100644 --- a/src/test/java/io/nats/client/NatsTestServer.java +++ b/src/test/java/io/nats/client/NatsTestServer.java @@ -1,4 +1,4 @@ -// Copyright 2015-2020 The NATS Authors +// Copyright 2015-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at: @@ -17,8 +17,13 @@ import nats.io.NatsServerRunner; import java.io.IOException; +import java.util.logging.Level; public class NatsTestServer extends NatsServerRunner { + static { + NatsServerRunner.setLoggingLevel(Level.SEVERE); + } + public NatsTestServer() throws IOException { super(); } @@ -71,7 +76,15 @@ public static int nextPort() throws IOException { return NatsRunnerUtils.nextPort(); } - public static String getURIForPort(int port) { - return NatsRunnerUtils.getURIForPort(port); + public String getLocalhostUri(String schema) { + return NatsRunnerUtils.getLocalhostUri(schema, getPort()); + } + + public static String getNatsLocalhostUri(int port) { + return NatsRunnerUtils.getNatsLocalhostUri(port); + } + + public static String getLocalhostUri(String schema, int port) { + return NatsRunnerUtils.getLocalhostUri(schema, port); } -} \ No newline at end of file +} diff --git a/src/test/java/io/nats/client/impl/ErrorListenerTests.java b/src/test/java/io/nats/client/impl/ErrorListenerTests.java index dc2594a90..2862e1595 100644 --- a/src/test/java/io/nats/client/impl/ErrorListenerTests.java +++ b/src/test/java/io/nats/client/impl/ErrorListenerTests.java @@ -26,8 +26,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import static io.nats.client.utils.TestBase.standardCloseConnection; -import static io.nats.client.utils.TestBase.standardConnection; +import static io.nats.client.utils.TestBase.*; import static org.junit.jupiter.api.Assertions.*; public class ErrorListenerTests { @@ -130,6 +129,7 @@ public void testErrorOnNoAuth() throws Exception { String[] customArgs = {"--user", "stephen", "--pass", "password"}; TestHandler handler = new TestHandler(); try (NatsTestServer ts = new NatsTestServer(customArgs, false)) { + sleep(100); // give the server time to get ready, otherwise sometimes this test flaps // See config file for user/pass Options options = new Options.Builder(). server(ts.getURI()) diff --git a/src/test/java/io/nats/client/impl/HeadersTests.java b/src/test/java/io/nats/client/impl/HeadersTests.java index 267618db6..1c447c0b5 100644 --- a/src/test/java/io/nats/client/impl/HeadersTests.java +++ b/src/test/java/io/nats/client/impl/HeadersTests.java @@ -561,7 +561,7 @@ private IncomingHeadersProcessor assertValidStatus(IncomingHeadersProcessor ihp, if (msg != null) { assertEquals(msg, status.getMessage()); } - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory("sid", "sub", "rt", 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory("sid", "sub", "rt", 0, false); imf.setHeaders(ihp); assertTrue(imf.getMessage().isStatusMessage()); return ihp; diff --git a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java index 6a4252ebb..6bcddb51b 100644 --- a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java @@ -888,7 +888,7 @@ public void testInternalLookupConsumerInfoCoverage() throws Exception { @Test public void testGetJetStreamValidatedConnectionCoverage() { - NatsJetStreamMessage njsm = new NatsJetStreamMessage(); + NatsJetStreamMessage njsm = new NatsJetStreamMessage(null); IllegalStateException ise = assertThrows(IllegalStateException.class, njsm::getJetStreamValidatedConnection); assertTrue(ise.getMessage().contains("subscription")); diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index ffd0c1032..fd50d6f4f 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -67,11 +67,11 @@ public NatsMessage getTestJsMessage(long seq, String sid) { } public NatsMessage getTestMessage(String replyTo) { - return new NatsMessage.InternalMessageFactory(mockSid(), "subj", replyTo, 0, false).getMessage(); + return new IncomingMessageFactory(mockSid(), "subj", replyTo, 0, false).getMessage(); } public NatsMessage getTestMessage(String replyTo, String sid) { - return new NatsMessage.InternalMessageFactory(sid, "subj", replyTo, 0, false).getMessage(); + return new IncomingMessageFactory(sid, "subj", replyTo, 0, false).getMessage(); } static class NoopMessageManager extends MessageManager {} diff --git a/src/test/java/io/nats/client/impl/MessageManagerTests.java b/src/test/java/io/nats/client/impl/MessageManagerTests.java index 2bfcaa4a0..1a92bbea8 100644 --- a/src/test/java/io/nats/client/impl/MessageManagerTests.java +++ b/src/test/java/io/nats/client/impl/MessageManagerTests.java @@ -366,7 +366,7 @@ private PushMessageManager getManager(Connection conn, SubscribeOptions so, Nats } private NatsMessage getFlowControl(int replyToId, String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(sid, "subj", getFcSubject(replyToId), 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(sid, "subj", getFcSubject(replyToId), 0, false); imf.setHeaders(new IncomingHeadersProcessor(("NATS/1.0 " + FLOW_OR_HEARTBEAT_STATUS_CODE + " " + FLOW_CONTROL_TEXT + "\r\n").getBytes())); return imf.getMessage(); } @@ -376,14 +376,14 @@ private String getFcSubject(int id) { } private NatsMessage getFcHeartbeat(int replyToId, String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(sid, "subj", null, 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(sid, "subj", null, 0, false); String s = "NATS/1.0 " + FLOW_OR_HEARTBEAT_STATUS_CODE + " " + HEARTBEAT_TEXT + "\r\n" + CONSUMER_STALLED_HDR + ":" + getFcSubject(replyToId) + "\r\n\r\n"; imf.setHeaders(new IncomingHeadersProcessor(s.getBytes())); return imf.getMessage(); } private NatsMessage getHeartbeat(String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(mockSid(), "subj", null, 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(mockSid(), "subj", null, 0, false); String s = "NATS/1.0 " + FLOW_OR_HEARTBEAT_STATUS_CODE + " " + HEARTBEAT_TEXT + "\r\n"; imf.setHeaders(new IncomingHeadersProcessor(s.getBytes())); return imf.getMessage(); @@ -402,7 +402,7 @@ private NatsMessage getUnkStatus(String sid) { } private NatsMessage getStatus(int code, String message, String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(sid, "subj", null, 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(sid, "subj", null, 0, false); imf.setHeaders(new IncomingHeadersProcessor(("NATS/1.0 " + code + " " + message + "\r\n").getBytes())); return imf.getMessage(); } diff --git a/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java b/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java index a757c263d..0b7ac6cc3 100644 --- a/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java +++ b/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java @@ -54,7 +54,7 @@ public static void main(String args[]) throws InterruptedException { start = System.nanoTime(); for (int j = 0; j < msgCount; j++) { - new NatsMessage.ProtocolMessage(EMPTY_BODY); + new ProtocolMessage(EMPTY_BODY); } end = System.nanoTime(); diff --git a/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java b/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java index 8e7cd190d..16ada6c7d 100644 --- a/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java +++ b/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java @@ -29,7 +29,7 @@ public static void main(String args[]) throws InterruptedException { MessageQueue warm = new MessageQueue(false); for (int j = 0; j < msgCount; j++) { - msgs[j] = new NatsMessage.ProtocolMessage(warmBytes); + msgs[j] = new ProtocolMessage(warmBytes); warm.push(msgs[j]); } diff --git a/src/test/java/io/nats/client/impl/MessageQueueTests.java b/src/test/java/io/nats/client/impl/MessageQueueTests.java index 854bc8530..d00f3ac19 100644 --- a/src/test/java/io/nats/client/impl/MessageQueueTests.java +++ b/src/test/java/io/nats/client/impl/MessageQueueTests.java @@ -13,7 +13,6 @@ package io.nats.client.impl; -import io.nats.client.impl.NatsMessage.ProtocolMessage; import org.junit.jupiter.api.Test; import java.io.UnsupportedEncodingException; diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index 99034a095..afd29bb3e 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -30,7 +30,7 @@ public class NatsMessageTests { @Test public void testSizeOnProtocolMessage() { - NatsMessage msg = new NatsMessage.ProtocolMessage("PING"); + NatsMessage msg = new ProtocolMessage("PING".getBytes()); assertEquals(msg.getProtocolBytes().length + 2, msg.getSizeInBytes(), "Size is set, with CRLF"); assertEquals("PING".getBytes(StandardCharsets.UTF_8).length + 2, msg.getSizeInBytes(), "Size is correct"); assertTrue(msg.toString().endsWith("PING")); // toString COVERAGE @@ -245,37 +245,26 @@ public void miscCoverage() { assertNotNull(m.getHeaders()); m.headers = null; // we can do this because we have package access - m.dirty = true; // for later tests, also is true b/c we nerfed the headers assertFalse(m.hasHeaders()); assertNull(m.getHeaders()); assertNotNull(m.toString()); // COVERAGE assertNotNull(m.getOrCreateHeaders()); - NatsMessage.ProtocolMessage pm = new NatsMessage.ProtocolMessage((byte[])null); + ProtocolMessage pm = new ProtocolMessage((byte[])null); assertNotNull(pm.protocolBab); assertEquals(0, pm.protocolBab.length()); - NatsMessage.InternalMessage scm = new NatsMessage.InternalMessage() {}; + IncomingMessage scm = new IncomingMessage() {}; assertNull(scm.protocolBab); - assertEquals(-1, scm.getControlLineLength()); + assertThrows(IllegalStateException.class, scm::getProtocolBytes); + assertThrows(IllegalStateException.class, scm::getControlLineLength); // coverage coverage coverage + //noinspection deprecation NatsMessage nmCov = new NatsMessage("sub", "reply", null, true); assertTrue(nmCov.isUtf8mode()); - nmCov.dirty = false; - nmCov.calculateIfDirty(); - - nmCov.dirty = false; - nmCov.headers = new Headers().add("foo", "bar"); - nmCov.calculateIfDirty(); - - nmCov.dirty = false; - nmCov.headers = new Headers().add("foo", "bar"); - nmCov.headers.getSerialized(); - nmCov.calculateIfDirty(); - - assertTrue(nmCov.toDetailString().contains("HPUB sub reply 21 21")); + assertTrue(nmCov.toDetailString().contains("PUB sub reply 0")); assertTrue(nmCov.toDetailString().contains("next=No")); nmCov.protocolBab = null; @@ -300,8 +289,8 @@ public void constructorWithMessage() { public void testFactoryProducesStatusMessage() { IncomingHeadersProcessor incomingHeadersProcessor = new IncomingHeadersProcessor("NATS/1.0 503 No Responders\r\n".getBytes()); - NatsMessage.InternalMessageFactory factory = - new NatsMessage.InternalMessageFactory("sid", "subj", "replyTo", 0, false); + IncomingMessageFactory factory = + new IncomingMessageFactory("sid", "subj", "replyTo", 0, false); factory.setHeaders(incomingHeadersProcessor); factory.setData(null); // coverage @@ -310,7 +299,7 @@ public void testFactoryProducesStatusMessage() { assertNotNull(m.getStatus()); assertEquals(503, m.getStatus().getCode()); assertNotNull(m.getStatus().toString()); - NatsMessage.StatusMessage sm = (NatsMessage.StatusMessage)m; + StatusMessage sm = (StatusMessage)m; assertNotNull(sm.toString()); } diff --git a/src/test/java/io/nats/client/impl/TLSConnectTests.java b/src/test/java/io/nats/client/impl/TLSConnectTests.java index d968b77da..211b4ebf9 100644 --- a/src/test/java/io/nats/client/impl/TLSConnectTests.java +++ b/src/test/java/io/nats/client/impl/TLSConnectTests.java @@ -214,7 +214,7 @@ public void testTLSOnReconnect() throws InterruptedException, Exception { SSLContext ctx = TestSSLUtils.createTestSSLContext(); Options options = new Options.Builder(). server(ts.getURI()). - server(NatsTestServer.getURIForPort(newPort)). + server(NatsTestServer.getNatsLocalhostUri(newPort)). maxReconnects(-1). sslContext(ctx). connectionListener(handler).