Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert removal of client side checks #981

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,6 @@ In this release, support was added to

For details on the other features, see the "Options" sections

#### Version 2.16.12: Max Payload Check

As of version 2.16.12, there is no longer client side checking
1. that a message payload is less than the server configuration (Core and JetStream publishes)
2. is less than the stream configuration (JetStream publishes)

Please see unit test for examples of this behavior.
`testMaxPayload` in [PublishTests.java](src/test/java/io/nats/client/PublishTests.java)
and
`testMaxPayloadJs` in [JetStreamPubTests.cs](src/test/java/io/nats/client/impl/JetStreamPubTests.java)

#### Version 2.16.8: Websocket Support

As of version 2.16.8 Websocket (`ws` and `wss`) protocols are supported for connecting to the server.
Expand Down Expand Up @@ -247,6 +236,7 @@ When options are built, the SSLContext will be accepted or created in the follow
| io.nats.client.norandomize | Property used to configure noRandomize. |
| io.nats.client.noResolveHostnames | Property used to configure noResolveHostnames. |
| io.nats.client.reportNoResponders | Property used to configure reportNoResponders. |
| io.nats.clientsidelimitchecks | Property use to configure clientsidelimitchecks. |
| io.nats.client.url | Property used to configure server. The value can be a comma-separated list of server URLs. |
| io.nats.client.servers | Property used to configure servers. The value can be a comma-separated list of server URLs. |
| io.nats.client.password | Property used to configure userinfo password. |
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ public class Options {
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#reportNoResponders() reportNoResponders}.
*/
public static final String PROP_REPORT_NO_RESPONDERS = PFX + "reportNoResponders";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#clientSideLimitChecks() clientSideLimitChecks}.
*/
public static final String PROP_CLIENT_SIDE_LIMIT_CHECKS = PFX + "clientsidelimitchecks";
/**
* Property used to configure a builder from a Properties object. {@value},
* see {@link Builder#servers(String[]) servers}. The value can be a comma-separated list of server URLs.
Expand Down Expand Up @@ -423,12 +427,6 @@ public class Options {
* Property used to set the path to a credentials file to be used in a FileAuthHandler
*/
public static final String PROP_CREDENTIAL_PATH = PFX + "credential.path";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#clientSideLimitChecks() clientSideLimitChecks}.
* @deprecated Client Side Limit checks are no longer performed.
*/
@Deprecated
public static final String PROP_CLIENT_SIDE_LIMIT_CHECKS = PFX + "clientsidelimitchecks";
/**
* This property is used to enable support for UTF8 subjects. See {@link Builder#supportUTF8Subjects() supportUTF8Subjects()}
* @deprecated only plain ascii subjects are supported
Expand Down Expand Up @@ -557,6 +555,7 @@ public class Options {
private final boolean noEcho;
private final boolean noHeaders;
private final boolean noNoResponders;
private final boolean clientSideLimitChecks;
private final int maxMessagesInOutgoingQueue;
private final boolean discardMessagesWhenOutgoingQueueFull;
private final boolean ignoreDiscoveredServers;
Expand Down Expand Up @@ -661,6 +660,7 @@ public static class Builder {
private boolean noEcho = false;
private boolean noHeaders = false;
private boolean noNoResponders = false;
private boolean clientSideLimitChecks = true;
private String inboxPrefix = DEFAULT_INBOX_PREFIX;
private int maxMessagesInOutgoingQueue = DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE;
private boolean discardMessagesWhenOutgoingQueueFull = DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL;
Expand Down Expand Up @@ -762,6 +762,7 @@ public Builder properties(Properties props) {
booleanProperty(props, PROP_NO_ECHO, b -> this.noEcho = b);
booleanProperty(props, PROP_NO_HEADERS, b -> this.noHeaders = b);
booleanProperty(props, PROP_NO_NORESPONDERS, b -> this.noNoResponders = b);
booleanProperty(props, PROP_CLIENT_SIDE_LIMIT_CHECKS, b -> this.clientSideLimitChecks = b);
booleanProperty(props, PROP_PEDANTIC, b -> this.pedantic = b);

intProperty(props, PROP_MAX_RECONNECT, DEFAULT_MAX_RECONNECT, i -> this.maxReconnect = i);
Expand Down Expand Up @@ -896,12 +897,12 @@ public Builder noNoResponders() {
}

/**
* @deprecated Client Side Limit checks are no longer performed.
* Set client side limit checks. Default is true
* @param checks the checks flag
* @return the Builder for chaining
*/
@Deprecated
public Builder clientSideLimitChecks(boolean checks) {
this.clientSideLimitChecks = checks;
return this;
}

Expand Down Expand Up @@ -1602,6 +1603,7 @@ public Builder(Options o) {
this.noEcho = o.noEcho;
this.noHeaders = o.noHeaders;
this.noNoResponders = o.noNoResponders;
this.clientSideLimitChecks = o.clientSideLimitChecks;
this.inboxPrefix = o.inboxPrefix;
this.traceConnection = o.traceConnection;
this.maxMessagesInOutgoingQueue = o.maxMessagesInOutgoingQueue;
Expand Down Expand Up @@ -1661,6 +1663,7 @@ private Options(Builder b) {
this.noEcho = b.noEcho;
this.noHeaders = b.noHeaders;
this.noNoResponders = b.noNoResponders;
this.clientSideLimitChecks = b.clientSideLimitChecks;
this.inboxPrefix = b.inboxPrefix;
this.traceConnection = b.traceConnection;
this.maxMessagesInOutgoingQueue = b.maxMessagesInOutgoingQueue;
Expand Down Expand Up @@ -1840,12 +1843,10 @@ public boolean isNoNoResponders() {
}

/**
* @deprecated Client Side Limit checks are no longer performed.
* @return clientSideLimitChecks flag
*/
@Deprecated
public boolean clientSideLimitChecks() {
return false;
return clientSideLimitChecks;
}

/**
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ public void publish(Message message) {

void publishInternal(String subject, String replyTo, Headers headers, byte[] data) {
checkIfNeedsHeaderSupport(headers);
checkPayloadSize(data);

if (isClosed()) {
throw new IllegalStateException("Connection is Closed");
Expand All @@ -847,6 +848,12 @@ private void checkIfNeedsHeaderSupport(Headers headers) {
}
}

private void checkPayloadSize(byte[] body) {
if (options.clientSideLimitChecks() && body != null && body.length > this.getMaxPayload() && this.getMaxPayload() > 0) {
throw new IllegalArgumentException(
"Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
}
}
/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -1120,6 +1127,8 @@ public CompletableFuture<Message> request(Message message) {
}

CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction) {
checkPayloadSize(data);

if (isClosed()) {
throw new IllegalStateException("Connection is Closed");
} else if (isDraining()) {
Expand Down
31 changes: 20 additions & 11 deletions src/test/java/io/nats/client/OptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package io.nats.client;

import io.nats.client.ConnectionListener.Events;
import io.nats.client.impl.DataPort;
import io.nats.client.impl.ErrorListenerLoggerImpl;
import io.nats.client.impl.TestHandler;
import io.nats.client.impl.TestStatisticsCollector;
import io.nats.client.impl.*;
import io.nats.client.support.HttpRequest;
import io.nats.client.support.NatsUri;
import io.nats.client.utils.CloseOnUpgradeAttempt;
Expand Down Expand Up @@ -354,23 +351,35 @@ private static void _testPropertiesSSLOptions(Options o) {
@SuppressWarnings("deprecation")
@Test
public void testDeprecated() {
// clientSideLimitChecks, supportUTF8Subjects are deprecated and always returns false
// supportUTF8Subjects are deprecated and always returns false
Options o = new Options.Builder().build();
assertFalse(o.clientSideLimitChecks());
assertFalse(o.supportUTF8Subjects());

o = new Options.Builder().clientSideLimitChecks(true).supportUTF8Subjects().build();
assertFalse(o.clientSideLimitChecks());
o = new Options.Builder().supportUTF8Subjects().build();
assertFalse(o.supportUTF8Subjects());

Properties props = new Properties();
props.setProperty(Options.PROP_CLIENT_SIDE_LIMIT_CHECKS, "true");
props.setProperty(Options.PROP_UTF8_SUBJECTS, "true");
o = new Options.Builder(props).build();
assertFalse(o.clientSideLimitChecks());
assertFalse(o.supportUTF8Subjects());
}

@Test
public void testBuilderCoverageOptions() {
Options o = new Options.Builder().build();
assertTrue(o.clientSideLimitChecks());
assertNull(o.getServerPool()); // there is a default provider

o = new Options.Builder().clientSideLimitChecks(true).build();
assertTrue(o.clientSideLimitChecks());
o = new Options.Builder()
.clientSideLimitChecks(false)
.serverPool(new NatsServerPool())
.build();
assertFalse(o.clientSideLimitChecks());
assertNotNull(o.getServerPool());
}

@Test
public void testProperties() throws Exception {
Properties props = new Properties();
Expand Down Expand Up @@ -492,7 +501,7 @@ private static void _testPropertiesCoverageOptions(Options o) {
assertNull(o.getSslContext());
assertTrue(o.isNoHeaders());
assertTrue(o.isNoNoResponders());
assertFalse(o.clientSideLimitChecks()); // clientSideLimitChecks is deprecated and always returns false
assertTrue(o.clientSideLimitChecks());
assertTrue(o.isIgnoreDiscoveredServers());
assertTrue(o.isNoResolveHostnames());
}
Expand Down
59 changes: 55 additions & 4 deletions src/test/java/io/nats/client/PublishTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static io.nats.client.support.NatsConstants.*;
Expand Down Expand Up @@ -64,6 +66,45 @@ public void testThrowsWithoutSubject() {
});
}

@Test
public void testThrowsIfTooBig() throws Exception {
try (NatsTestServer ts = new NatsTestServer("src/test/resources/max_payload.conf", false, false))
{
Connection nc = Nats.connect(ts.getURI());
assertSame(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status");

byte[] body = new byte[1001];
assertThrows(IllegalArgumentException.class, () -> nc.publish("subject", null, null, body));
nc.close();

AtomicBoolean mpv = new AtomicBoolean(false);
AtomicBoolean se = new AtomicBoolean(false);
ErrorListener el = new ErrorListener() {
@Override
public void errorOccurred(Connection conn, String error) {
mpv.set(error.contains("Maximum Payload Violation"));
}

@Override
public void exceptionOccurred(Connection conn, Exception exp) {
se.set(exp instanceof SocketException);
}
};
Options options = Options.builder()
.server(ts.getURI())
.clientSideLimitChecks(false)
.errorListener(el)
.build();
Connection nc2 = Nats.connect(options);
assertSame(Connection.Status.CONNECTED, nc2.getStatus(), "Connected Status");
nc2.publish("subject", null, null, body);

sleep(100);
assertTrue(mpv.get());
assertTrue(se.get());
}
}

@Test
public void testThrowsIfheadersNotSupported() {
assertThrows(IllegalArgumentException.class, () -> {
Expand Down Expand Up @@ -197,17 +238,27 @@ public void testMaxPayload() throws Exception {
nc.publish("mptest", new byte[maxPayload-1]);
nc.publish("mptest", new byte[maxPayload]);
});

try {
runInServer(standardOptionsBuilder().noReconnect(), nc -> {
runInServer(standardOptionsBuilder().noReconnect().clientSideLimitChecks(false), nc -> {
int maxPayload = (int)nc.getServerInfo().getMaxPayload();
for (int x = 1; x < 1000; x++) {
nc.publish("mptest", new byte[maxPayload + x]);
}
});
fail("Expecting IllegalStateException");
}
catch (IllegalStateException e) {
return;
catch (IllegalStateException ignore) {}

try {
runInServer(standardOptionsBuilder().noReconnect(), nc -> {
int maxPayload = (int)nc.getServerInfo().getMaxPayload();
for (int x = 1; x < 1000; x++) {
nc.publish("mptest", new byte[maxPayload + x]);
}
});
fail("Expecting IllegalArgumentException");
}
fail("Expecting connection to be closed");
catch (IllegalArgumentException ignore) {}
}
}
1 change: 1 addition & 0 deletions src/test/resources/max_payload.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
max_payload: 1000