nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
+ if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
+ || NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
+ AbstractSession abstractSession
+ = ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
+ abstractSession.activateNoFlowControl();
+ }
}
return true;
}
@@ -157,7 +171,7 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
*
- * This default implementation does not marshal any extension.
+ * This default implementation marshals a {@link NoFlowControl} extension}.
*
*
* @param session {@link Session} to send the KEX extension information for
@@ -165,5 +179,11 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
* @param marshaller {@link BiConsumer} writing the extensions into an SSH message
*/
public void collectExtensions(Session session, KexPhase phase, BiConsumer marshaller) {
+ // no-flow-control
+ Boolean nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session).orElse(null);
+ if (nfc == null || nfc) {
+ marshaller.accept(NoFlowControl.NAME,
+ nfc != null ? NoFlowControl.PREFERRED : NoFlowControl.SUPPORTED);
+ }
}
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/kex/extension/DefaultServerKexExtensionHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/kex/extension/DefaultServerKexExtensionHandler.java
index 7d5924622..47a2db70e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/kex/extension/DefaultServerKexExtensionHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/kex/extension/DefaultServerKexExtensionHandler.java
@@ -19,23 +19,29 @@
package org.apache.sshd.common.kex.extension;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.sshd.common.AttributeRepository.AttributeKey;
import org.apache.sshd.common.kex.KexProposalOption;
+import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
import org.apache.sshd.common.kex.extension.parser.ServerSignatureAlgorithms;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.helpers.AbstractSession;
import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+import org.apache.sshd.core.CoreModuleProperties;
/**
* A basic default implementation of a server-side {@link KexExtensionHandler} handling the
- * {@link ServerSignatureAlgorithms} KEX extension.
+ * {@link ServerSignatureAlgorithms} KEX extension along with the {@link NoFlowControl} extension.
*
* @see RFC 8308
*/
@@ -130,6 +136,23 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
}
}
+ @Override
+ public boolean handleKexExtensionRequest(
+ Session session, int index, int count, String name, byte[] data)
+ throws IOException {
+ if (NoFlowControl.NAME.equals(name)) {
+ String o = NoFlowControl.INSTANCE.parseExtension(data);
+ Optional nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
+ if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
+ || NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
+ AbstractSession abstractSession
+ = ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
+ abstractSession.activateNoFlowControl();
+ }
+ }
+ return true;
+ }
+
/**
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
@@ -144,6 +167,7 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
*/
@SuppressWarnings("javadoc")
public void collectExtensions(Session session, KexPhase phase, BiConsumer marshaller) {
+ // server-sig-algs
if (phase == KexPhase.NEWKEYS) {
Collection algorithms = session.getSignatureFactoriesNames();
if (!GenericUtils.isEmpty(algorithms)) {
@@ -157,5 +181,11 @@ public void collectExtensions(Session session, KexPhase phase, BiConsumer T resolveAttribute(Session session, AttributeRepository.AttributeKey<
T value = session.getAttribute(key);
return (value != null) ? value : FactoryManager.resolveAttribute(session.getFactoryManager(), key);
}
+
+ /**
+ * Check if the no-flow-control KEX extension has been activated.
+ */
+ boolean isNoFlowControl();
+
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 9251d651d..e3f42f662 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -757,6 +757,13 @@ protected void channelOpen(Buffer buffer) throws Exception {
return;
}
+ if (getSession().isNoFlowControl() && !channels.isEmpty()) {
+ // TODO add language tag configurable control
+ sendChannelOpenFailure(buffer, sender, SshConstants.SSH_OPEN_CONNECT_FAILED,
+ "Only a single channel can be opened when using no-flow-control", "");
+ return;
+ }
+
Session session = getSession();
FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
Channel channel = ChannelFactory.createChannel(session, manager.getChannelFactories(), type);
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 31589a4bf..12a325994 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -304,6 +304,8 @@ public abstract class AbstractSession extends SessionHelper {
private byte[] clientKexData; // the payload of the client's SSH_MSG_KEXINIT
private byte[] serverKexData; // the payload of the server's SSH_MSG_KEXINIT
+ private boolean noFlowControl;
+
/**
* Create a new session.
*
@@ -2769,6 +2771,22 @@ protected abstract void receiveKexInit(
Map proposal, byte[] seed)
throws IOException;
+ /**
+ * Activate the no-flow-control KEX extension specified by https://tools.ietf.org/html/rfc8308#section-3.3
+ */
+ public void activateNoFlowControl() {
+ this.noFlowControl = true;
+ log.info("activateNoFlowControl({}): deactivating flow control", this);
+ }
+
+ /**
+ * Check if the no-flow-control KEX extension has been activated.
+ */
+ @Override
+ public boolean isNoFlowControl() {
+ return noFlowControl;
+ }
+
/**
* Retrieve the SSH session from the I/O session. If the session has not been attached, an exception will be thrown
*
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java
index a5853e06e..03c400963 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java
@@ -40,6 +40,7 @@
import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.kex.extension.KexExtensions;
import org.apache.sshd.common.util.ExceptionUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
@@ -236,7 +237,7 @@ public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) thr
int cmd = bufData[buffer.rpos()] & 0xFF;
boolean enqueued = false;
boolean isLowLevelMessage = cmd <= SshConstants.SSH_MSG_KEX_LAST && cmd != SshConstants.SSH_MSG_SERVICE_REQUEST
- && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT;
+ && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT && cmd != KexExtensions.SSH_MSG_EXT_INFO;
IoWriteFuture future = null;
try {
if (isLowLevelMessage) {
diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
index d8c8d35c1..3b6f4e4e5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
+++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
@@ -26,6 +26,7 @@
import org.apache.sshd.common.Property;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.OsUtils;
import org.apache.sshd.common.util.ValidateUtils;
@@ -800,6 +801,13 @@ public final class CoreModuleProperties {
}
});
+ /**
+ * Configuration value to enable {@code no-flow-control}. When set to {@code true}, the option will be used if the
+ * connected peer also supports it. A value of {@code false} disables the {@code no-flow-control} completely, while
+ * the default {@code null} value, will support it, but not request it.
+ */
+ public static final Property NO_FLOW_CONTROL = Property.bool(NoFlowControl.NAME);
+
private CoreModuleProperties() {
throw new UnsupportedOperationException("No instance");
}
diff --git a/sshd-core/src/test/java/org/apache/sshd/NoFlowControlTest.java b/sshd-core/src/test/java/org/apache/sshd/NoFlowControlTest.java
new file mode 100644
index 000000000..b19f3808b
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/NoFlowControlTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.WritePendingException;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.common.util.io.output.NoCloseOutputStream;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.core.CoreModuleProperties;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.channel.ChannelSession;
+import org.apache.sshd.server.command.AsyncCommand;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the no-flow-control extension.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class NoFlowControlTest extends BaseTestSupport {
+
+ public static final byte END_FILE = '#';
+ public static final int BIG_MSG_SEND_COUNT = 10000;
+
+ @Rule
+ public Timeout timeout = Timeout.seconds(TimeUnit.MINUTES.toSeconds(6));
+
+ private SshServer sshServer;
+ private int port;
+
+ public NoFlowControlTest() {
+ super();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sshServer = setupTestServer();
+
+ byte[] msg = IoUtils.toByteArray(getClass().getResourceAsStream("/big-msg.txt"));
+ sshServer.setShellFactory(
+ channel -> new FloodingAsyncCommand(msg, BIG_MSG_SEND_COUNT, END_FILE));
+
+ sshServer.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+ sshServer.start();
+ port = sshServer.getPort();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (sshServer != null) {
+ sshServer.stop();
+ sshServer.close(true);
+ }
+ }
+
+ @Test
+ public void testNoFlowControl() throws Exception {
+ try (SshClient client = setupTestClient()) {
+ CoreModuleProperties.NO_FLOW_CONTROL.set(client, true);
+ client.start();
+
+ try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
+ .verify(CONNECT_TIMEOUT).getSession()) {
+ session.addPasswordIdentity(getCurrentTestName());
+ session.auth().verify(AUTH_TIMEOUT);
+
+ try (ClientChannel channel = session.createShellChannel()) {
+ channel.setOut(new VerifyingOutputStream(channel, END_FILE));
+ channel.setErr(new NoCloseOutputStream(System.err));
+ channel.open().verify(OPEN_TIMEOUT);
+
+ assertTrue(session.isNoFlowControl());
+ assertTrue(sshServer.getActiveSessions().get(0).isNoFlowControl());
+
+ Collection result
+ = channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), TimeUnit.MINUTES.toMillis(2L));
+ assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannelEvent.TIMEOUT));
+ }
+ } finally {
+ client.stop();
+ }
+ }
+ }
+
+ /**
+ * Read all incoming data and if END_FILE symbol is detected, kill client session to end test
+ */
+ private static class VerifyingOutputStream extends OutputStream {
+ private final Logger log;
+ private final ClientChannel channel;
+ private final byte eofSignal;
+
+ VerifyingOutputStream(ClientChannel channel, final byte eofSignal) {
+ this.log = LoggerFactory.getLogger(getClass());
+ this.channel = channel;
+ this.eofSignal = eofSignal;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (channel.isClosed() || channel.isClosing()) {
+ throw new IOException("Channel (" + channel + ") is closing / closed on write single byte");
+ }
+
+ if (b == (eofSignal & 0xff)) {
+ log.info("Closing channel (" + channel + ") due to single byte EOF");
+ channel.close(true);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (channel.isClosed() || channel.isClosing()) {
+ throw new IOException("Channel (" + channel + ") is closing / closed on write " + len + " bytes");
+ }
+
+ if (len <= 0) {
+ return;
+ }
+
+ int lastPos = off + len - 1;
+ if ((b[lastPos] & 0xFF) == (eofSignal & 0xFF)) {
+ log.info("Closing channel (" + channel + ") due to last byte EOF");
+ channel.close(true);
+ }
+ }
+ }
+
+ public static final class FloodingAsyncCommand extends AbstractLoggingBean implements AsyncCommand {
+ private static final AtomicInteger POOL_COUNT = new AtomicInteger(0);
+
+ private final AtomicReference executorHolder = new AtomicReference<>();
+ private final AtomicReference> futureHolder = new AtomicReference<>();
+
+ private AsyncInPendingWrapper pendingWrapper;
+ private byte[] msg;
+ private int sendCount;
+ private byte eofSignal;
+
+ public FloodingAsyncCommand(final byte[] msg, final int sendCount, final byte eofSignal) {
+ this.msg = msg;
+ this.sendCount = sendCount;
+ this.eofSignal = eofSignal;
+ }
+
+ @Override
+ public void setIoInputStream(IoInputStream in) {
+ // ignored
+ }
+
+ @Override
+ public void setIoOutputStream(IoOutputStream out) {
+ pendingWrapper = new AsyncInPendingWrapper(out);
+ }
+
+ @Override
+ public void setIoErrorStream(IoOutputStream err) {
+ // ignored
+ }
+
+ @Override
+ public void setInputStream(InputStream in) {
+ // ignored
+ }
+
+ @Override
+ public void setOutputStream(OutputStream out) {
+ // ignored
+ }
+
+ @Override
+ public void setErrorStream(OutputStream err) {
+ // ignored
+ }
+
+ @Override
+ public void setExitCallback(ExitCallback callback) {
+ // ignored
+ }
+
+ @Override
+ public void start(ChannelSession channel, Environment env) throws IOException {
+ log.info("Starting");
+
+ ExecutorService service
+ = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName() + "-" + POOL_COUNT.incrementAndGet());
+ executorHolder.set(service);
+
+ futureHolder.set(service.submit((Runnable) () -> {
+ log.info("Start heavy load sending " + sendCount + " messages of " + msg.length + " bytes");
+ for (int i = 0; i < sendCount; i++) {
+ try {
+ pendingWrapper.write(new ByteArrayBuffer(msg));
+ } catch (IOException e) {
+ log.error("Failed ({}) to send message #{}/{}: {}",
+ e.getClass().getSimpleName(), i + 1, sendCount, e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ log.info("Sending EOF signal");
+
+ try {
+ pendingWrapper.write(new ByteArrayBuffer(new byte[] { eofSignal }));
+ } catch (IOException e) {
+ log.error("Failed ({}) to send EOF message after {} messages: {}",
+ e.getClass().getSimpleName(), sendCount, e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }));
+ log.info("Started");
+ }
+
+ @Override
+ public void destroy(ChannelSession channel) {
+ log.info("Destroying");
+
+ Future> future = futureHolder.getAndSet(null);
+ if ((future != null) && (!future.isDone())) {
+ log.info("Cancelling");
+ future.cancel(true);
+ }
+
+ ExecutorService service = executorHolder.getAndSet(null);
+ if ((service != null) && (!service.isShutdown())) {
+ log.info("Shutdown");
+ service.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Wrapper for asyncIn stream that catches Pending exception and queues the pending messages for later retry (send
+ * after previous messages were fully transfered)
+ */
+ private static class AsyncInPendingWrapper extends AbstractLoggingBean {
+ private IoOutputStream asyncIn;
+
+ // Order has to be preserved for queued writes
+ private final Deque pending = new LinkedList<>();
+
+ AsyncInPendingWrapper(IoOutputStream out) {
+ this.asyncIn = out;
+ }
+
+ public synchronized void write(Object msg) throws IOException {
+ if ((asyncIn != null) && (!asyncIn.isClosed()) && (!asyncIn.isClosing())) {
+ Buffer byteBufferMsg = (Buffer) msg;
+ if (!pending.isEmpty()) {
+ queueRequest(byteBufferMsg);
+ return;
+ }
+
+ writeWithPendingDetection(byteBufferMsg, false);
+ }
+ }
+
+ private void writeWithPendingDetection(Buffer msg, boolean wasPending) throws IOException {
+ try {
+ asyncIn.writeBuffer(msg).addListener(future -> {
+ if (future.isWritten()) {
+ try {
+ if (wasPending) {
+ synchronized (AsyncInPendingWrapper.this) {
+ pending.remove();
+ }
+ }
+ writePendingIfAny();
+ } catch (Throwable e) {
+ log.error("Failed ({}) to re-write pending: {}", e.getClass().getSimpleName(), e.getMessage());
+ }
+ } else {
+ Throwable t = future.getException();
+ log.warn("Failed to write message", t);
+ }
+ });
+ } catch (WritePendingException e) {
+ if (!wasPending) {
+ queueRequest(msg);
+ }
+ } catch (Throwable t) {
+ log.error("Failed to write or queue", t);
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ throw new IOException("Failed to write or queue", t);
+ }
+ }
+
+ private synchronized void writePendingIfAny() throws IOException {
+ if (pending.peek() == null) {
+ return;
+ }
+
+ Buffer msg = pending.peek();
+ writeWithPendingDetection(msg, true);
+ }
+
+ private void queueRequest(Buffer msg) {
+ msg.rpos(0);
+ pending.add(msg);
+ }
+ }
+}