diff --git a/ballerina/http_client_connection_pool.bal b/ballerina/http_client_connection_pool.bal index 06e3636fa4..25a74d1e31 100644 --- a/ballerina/http_client_connection_pool.bal +++ b/ballerina/http_client_connection_pool.bal @@ -20,6 +20,10 @@ configurable int maxActiveConnections = -1; configurable int maxIdleConnections = 100; configurable decimal waitTime = 30; configurable int maxActiveStreamsPerConnection = 100; +configurable decimal minEvictableIdleTime = 300; +configurable decimal timeBetweenEvictionRuns = 30; +configurable decimal minIdleTimeInStaleState = 300; +configurable decimal timeBetweenStaleEviction = 30; # Configurations for managing HTTP client connection pool. # @@ -29,14 +33,22 @@ configurable int maxActiveStreamsPerConnection = 100; # + maxActiveStreamsPerConnection - Maximum active streams per connection. This only applies to HTTP/2. Default value is 100 # + minEvictableIdleTime - Minimum evictable time for an idle connection in seconds. Default value is 5 minutes # + timeBetweenEvictionRuns - Time between eviction runs in seconds. Default value is 30 seconds +# + minIdleTimeInStaleState - Minimum time in seconds for a connection to be kept open which has received a GOAWAY. +# This only applies for HTTP/2. Default value is 5 minutes. If the value is set to -1, +# the connection will be closed after all in-flight streams are completed +# + timeBetweenStaleEviction - Time between the connection stale eviction runs in seconds. This only applies for HTTP/2. +# Default value is 30 seconds public type PoolConfiguration record {| int maxActiveConnections = maxActiveConnections; int maxIdleConnections = maxIdleConnections; decimal waitTime = waitTime; int maxActiveStreamsPerConnection = maxActiveStreamsPerConnection; - decimal minEvictableIdleTime = 300; - decimal timeBetweenEvictionRuns = 30; + decimal minEvictableIdleTime = minEvictableIdleTime; + decimal timeBetweenEvictionRuns = timeBetweenEvictionRuns; + decimal minIdleTimeInStaleState = minIdleTimeInStaleState; + decimal timeBetweenStaleEviction = timeBetweenStaleEviction; |}; + //This is a hack to get the global map initialized, without involving locking. class ConnectionManager { public PoolConfiguration & readonly poolConfig = {}; diff --git a/changelog.md b/changelog.md index 3526e6b43b..3dc64a4314 100644 --- a/changelog.md +++ b/changelog.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - [Expose HTTP connection eviction configurations in the client level](https://github.com/ballerina-platform/ballerina-library/issues/5951) +- [Handle GO_AWAY received HTTP/2 clients gracefully](https://github.com/ballerina-platform/ballerina-library/issues/4806) ### Fixed - [Remove unused import from Http2StateUtil](https://github.com/ballerina-platform/ballerina-library/issues/5966) diff --git a/native/src/main/java/io/ballerina/stdlib/http/api/HttpConstants.java b/native/src/main/java/io/ballerina/stdlib/http/api/HttpConstants.java index 286e621115..c491ba9b8e 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/api/HttpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/http/api/HttpConstants.java @@ -510,6 +510,10 @@ public final class HttpConstants { "minEvictableIdleTime"); public static final BString CONNECTION_POOLING_TIME_BETWEEN_EVICTION_RUNS = StringUtils.fromString( "timeBetweenEvictionRuns"); + public static final BString CONNECTION_POOLING_IDLE_TIME_STALE_STATE = StringUtils.fromString( + "minIdleTimeInStaleState"); + public static final BString CONNECTION_POOLING_TIME_BETWEEN_STALE_CHECK_RUNS = StringUtils.fromString( + "timeBetweenStaleEviction"); public static final String HTTP_CLIENT_CONNECTION_POOL = "PoolConfiguration"; public static final String CONNECTION_MANAGER = "ConnectionManager"; public static final int POOL_CONFIG_INDEX = 1; diff --git a/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java b/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java index 6d8b96e59e..0e983ed339 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java +++ b/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java @@ -1359,8 +1359,21 @@ public static void populatePoolingConfig(BMap poolRecord, PoolConfiguration pool double timeBetweenEvictionRuns = ((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_TIME_BETWEEN_EVICTION_RUNS)).floatValue(); - poolConfiguration.setTimeBetweenEvictionRuns( - timeBetweenEvictionRuns < 0 ? 0 : (long) timeBetweenEvictionRuns * 1000); + if (timeBetweenEvictionRuns > 0) { + poolConfiguration.setTimeBetweenEvictionRuns((long) timeBetweenEvictionRuns * 1000); + } + + double minIdleTimeInStaleState = + ((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_IDLE_TIME_STALE_STATE)).floatValue(); + poolConfiguration.setMinIdleTimeInStaleState(minIdleTimeInStaleState < -1 ? -1 : + (long) minEvictableIdleTime * 1000); + + double timeBetweenStaleEviction = + ((BDecimal) poolRecord.get(HttpConstants.CONNECTION_POOLING_TIME_BETWEEN_STALE_CHECK_RUNS)) + .floatValue(); + if (timeBetweenStaleEviction > 0) { + poolConfiguration.setTimeBetweenStaleEviction((long) timeBetweenStaleEviction * 1000); + } } private static int validateConfig(long value, String configName) { diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contract/Constants.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contract/Constants.java index a99caa264e..5f9f282e7a 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contract/Constants.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contract/Constants.java @@ -345,6 +345,8 @@ public final class Constants { = "Idle timeout triggered before initiating outbound request"; public static final String IDLE_TIMEOUT_TRIGGERED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS = "Idle timeout triggered while writing outbound request headers"; + public static final String IDLE_TIMEOUT_TRIGGERED_WHILE_SENDING_RST_STREAM + = "Idle timeout triggered while sending RST_STREAM frame"; public static final String IDLE_TIMEOUT_TRIGGERED_WHILE_WRITING_OUTBOUND_REQUEST_BODY = "Idle timeout triggered while writing outbound request entity body"; public static final String IDLE_TIMEOUT_TRIGGERED_BEFORE_INITIATING_INBOUND_RESPONSE @@ -386,6 +388,8 @@ public final class Constants { = "Remote host closed the connection before initiating outbound request"; public static final String REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS = "Remote host closed the connection while writing outbound request headers"; + public static final String REMOTE_SERVER_CLOSED_WHILE_SENDING_RST_STREAM + = "Remote host closed the connection while sending RST_STREAM frame"; public static final String REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_BODY = "Remote host closed the connection while writing outbound request entity body"; public static final String REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE @@ -396,6 +400,32 @@ public final class Constants { = "Remote host closed the connection while reading inbound response body"; public static final String REMOTE_SERVER_CLOSED_BEFORE_READING_100_CONTINUE_RESPONSE = "Remote host closed the connection before reading 100 continue response"; + // Server GOAWAY error scenarios + public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS + = "Remote host sent GOAWAY while writing outbound request headers"; + public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_SENDING_RST_STREAM + = "Remote host sent GOAWAY while sending RST_STREAM frame"; + public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_BODY + = "Remote host sent GOAWAY while writing outbound request entity body"; + public static final String REMOTE_SERVER_SENT_GOAWAY_BEFORE_INITIATING_INBOUND_RESPONSE + = "Remote host sent GOAWAY before initiating inbound response"; + public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_HEADERS + = "Remote host sent GOAWAY while reading inbound response headers"; + public static final String REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_BODY + = "Remote host sent GOAWAY while reading inbound response body"; + // Server send RST_STREAM error scenarios + public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS + = "Remote host sent RST_STREAM while writing outbound request headers"; + public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_SENDING_RST_STREAM + = "Remote host sent RST_STREAM while sending RST_STREAM frame"; + public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_BODY + = "Remote host sent RST_STREAM while writing outbound request entity body"; + public static final String REMOTE_SERVER_SENT_RST_STREAM_BEFORE_INITIATING_INBOUND_RESPONSE + = "Remote host sent RST_STREAM before initiating inbound response"; + public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_HEADERS + = "Remote host sent RST_STREAM while reading inbound response headers"; + public static final String REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_BODY + = "Remote host sent RST_STREAM while reading inbound response body"; public static final String REMOTE_CLIENT_TO_HOST_CONNECTION_CLOSED = "Connection between remote client and host is closed"; diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/DefaultHttpClientConnector.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/DefaultHttpClientConnector.java index 53da6e9c9b..1b6a34b4fc 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/DefaultHttpClientConnector.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/DefaultHttpClientConnector.java @@ -183,7 +183,7 @@ public HttpResponseFuture send(OutboundMsgHolder outboundMsgHolder, HttpCarbonMe this.configHashCode); if (http2) { // See whether an already upgraded HTTP/2 connection is available - Http2ClientChannel activeHttp2ClientChannel = http2ConnectionManager.borrowChannel(route); + Http2ClientChannel activeHttp2ClientChannel = http2ConnectionManager.fetchChannel(route); if (activeHttp2ClientChannel != null) { outboundMsgHolder.setHttp2ClientChannel(activeHttp2ClientChannel); diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/states/Http2StateUtil.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/states/Http2StateUtil.java index 708750b793..e8ed1daf54 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/states/Http2StateUtil.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/states/Http2StateUtil.java @@ -370,7 +370,11 @@ private static int getNextStreamId(Http2Connection conn) { * @throws Http2Exception if a protocol-related error occurred */ private static void createStream(Http2Connection conn, int streamId) throws Http2Exception { - conn.local().createStream(streamId, false); + try { + conn.local().createStream(streamId, false); + } catch (Http2Exception.StreamException exception) { + throw new Http2Exception(exception.error(), "Error occurred while creating stream", exception); + } if (LOG.isDebugEnabled()) { LOG.debug("Stream created streamId: {}", streamId); } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/HttpClientChannelInitializer.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/HttpClientChannelInitializer.java index 8b83a42147..7f4b0c6b48 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/HttpClientChannelInitializer.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/HttpClientChannelInitializer.java @@ -219,7 +219,8 @@ private void configureSslForHttp2(SocketChannel ch, ChannelPipeline clientPipeli sslConfig.getCacheSize())); } } - clientPipeline.addLast(new Http2PipelineConfiguratorForClient(targetHandler, connectionAvailabilityFuture)); + clientPipeline.addLast( + new ALPNClientHandler(targetHandler, connectionAvailabilityFuture)); clientPipeline .addLast(Constants.HTTP2_EXCEPTION_HANDLER, new Http2ExceptionHandler(http2ConnectionHandler)); } @@ -329,13 +330,13 @@ public void setHttp2ClientChannel(Http2ClientChannel http2ClientChannel) { /** * A handler to create the pipeline based on the ALPN negotiated protocol. */ - class Http2PipelineConfiguratorForClient extends ApplicationProtocolNegotiationHandler { + class ALPNClientHandler extends ApplicationProtocolNegotiationHandler { private TargetHandler targetHandler; private ConnectionAvailabilityFuture connectionAvailabilityFuture; - public Http2PipelineConfiguratorForClient(TargetHandler targetHandler, - ConnectionAvailabilityFuture connectionAvailabilityFuture) { + public ALPNClientHandler(TargetHandler targetHandler, + ConnectionAvailabilityFuture connectionAvailabilityFuture) { super(ApplicationProtocolNames.HTTP_1_1); this.targetHandler = targetHandler; this.connectionAvailabilityFuture = connectionAvailabilityFuture; diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/ConnectionManager.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/ConnectionManager.java index 4bea045824..5fa329abe9 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/ConnectionManager.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/ConnectionManager.java @@ -52,7 +52,7 @@ public ConnectionManager(PoolConfiguration poolConfiguration) { globalConnPool = new ConcurrentHashMap<>(); globalFactoryObjects = new ConcurrentHashMap<>(); http2ConnectionManager = new Http2ConnectionManager(poolConfiguration); - connectionManagerId = "-" + UUID.randomUUID().toString(); + connectionManagerId = "-" + UUID.randomUUID(); } /** diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/PoolConfiguration.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/PoolConfiguration.java index 9ab58dc6d0..bbc94f93fb 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/PoolConfiguration.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/channel/pool/PoolConfiguration.java @@ -35,6 +35,8 @@ public class PoolConfiguration { private int eventGroupExecutorThreads = 15; private long maxWaitTime = 60000L; private int http2MaxActiveStreamsPerConnection = Integer.MAX_VALUE; + private long minIdleTimeInStaleState = 300000; + private long timeBetweenStaleEviction = 30000; public PoolConfiguration() { } @@ -142,4 +144,20 @@ public int getHttp2MaxActiveStreamsPerConnection() { public void setHttp2MaxActiveStreamsPerConnection(int http2MaxActiveStreamsPerConnection) { this.http2MaxActiveStreamsPerConnection = http2MaxActiveStreamsPerConnection; } + + public long getMinIdleTimeInStaleState() { + return minIdleTimeInStaleState; + } + + public void setMinIdleTimeInStaleState(long minIdleTimeInStaleState) { + this.minIdleTimeInStaleState = minIdleTimeInStaleState; + } + + public long getTimeBetweenStaleEviction() { + return timeBetweenStaleEviction; + } + + public void setTimeBetweenStaleEviction(long timeBetweenStaleEviction) { + this.timeBetweenStaleEviction = timeBetweenStaleEviction; + } } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ChannelPool.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ChannelPool.java index bb77cc2259..baf4786251 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ChannelPool.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ChannelPool.java @@ -79,7 +79,7 @@ synchronized Http2ClientChannel fetchTargetChannel() { } Channel channel = http2ClientChannel.getChannel(); if (channel == null) { // if channel is not active, forget it and fetch next one - http2ClientChannels.remove(http2ClientChannel); + removeChannel(http2ClientChannel); return fetchTargetChannel(); } // increment and get active stream count @@ -89,7 +89,7 @@ synchronized Http2ClientChannel fetchTargetChannel() { return http2ClientChannel; } else if (activeStreamCount == maxActiveStreams) { // no more streams except this one can be opened http2ClientChannel.markAsExhausted(); - http2ClientChannels.remove(http2ClientChannel); + removeChannel(http2ClientChannel); // When the stream count reaches maxActiveStreams, a new channel will be added only if the // channel queue is empty. This process is synchronized as transport thread can return channels // after being reset. If such channel is returned before the new CountDownLatch, the subsequent @@ -104,7 +104,7 @@ synchronized Http2ClientChannel fetchTargetChannel() { } return http2ClientChannel; } else { - http2ClientChannels.remove(http2ClientChannel); + removeChannel(http2ClientChannel); return fetchTargetChannel(); // fetch the next one from the queue } } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ClientChannel.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ClientChannel.java index 53af3e0e7c..ee3d0b0dbe 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ClientChannel.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ClientChannel.java @@ -21,6 +21,7 @@ import io.ballerina.stdlib.http.transport.contract.Constants; import io.ballerina.stdlib.http.transport.contractimpl.common.HttpRoute; import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.handler.codec.http2.Http2Connection; @@ -60,6 +61,8 @@ public class Http2ClientChannel { private int socketIdleTimeout = Constants.ENDPOINT_TIMEOUT; private Map dataEventListeners; private StreamCloseListener streamCloseListener; + private long timeSinceMarkedAsStale = 0; + private AtomicBoolean isStale = new AtomicBoolean(false); public Http2ClientChannel(Http2ConnectionManager http2ConnectionManager, Http2Connection connection, HttpRoute httpRoute, Channel channel) { @@ -253,7 +256,7 @@ void destroy() { this.connection.removeListener(streamCloseListener); inFlightMessages.clear(); promisedMessages.clear(); - http2ConnectionManager.removeClientChannel(httpRoute, this); + removeFromConnectionPool(); } /** @@ -271,6 +274,10 @@ private void handleConnectionClose() { } } + ConcurrentHashMap getInFlightMessages() { + return inFlightMessages; + } + /** * Listener which listen to the stream closure event. */ @@ -289,13 +296,50 @@ public void onStreamClosed(Http2Stream stream) { activeStreams.decrementAndGet(); http2ClientChannel.getDataEventListeners(). forEach(dataEventListener -> dataEventListener.onStreamClose(stream.id())); - if (isExhausted.getAndSet(false)) { + if (!isStale.get() && isExhausted.getAndSet(false)) { http2ConnectionManager.returnClientChannel(httpRoute, http2ClientChannel); } } + + @Override + public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { + markAsStale(); + http2ClientChannel.inFlightMessages.forEach((streamId, outboundMsgHolder) -> { + if (streamId > lastStreamId) { + http2ClientChannel.removeInFlightMessage(streamId); + activeStreams.decrementAndGet(); + http2ClientChannel.getDataEventListeners().forEach( + dataEventListener -> dataEventListener.onStreamClose(streamId)); + Http2MessageStateContext messageStateContext = + outboundMsgHolder.getRequest().getHttp2MessageStateContext(); + if (messageStateContext != null) { + messageStateContext.getSenderState().handleServerGoAway(outboundMsgHolder); + } + } + }); + } } void removeFromConnectionPool() { http2ConnectionManager.removeClientChannel(httpRoute, this); } + + void markAsStale() { + synchronized (http2ConnectionManager) { + isStale.set(true); + http2ConnectionManager.markClientChannelAsStale(httpRoute, this); + } + } + + boolean hasInFlightMessages() { + return !inFlightMessages.isEmpty(); + } + + void setTimeSinceMarkedAsStale(long timeSinceMarkedAsStale) { + this.timeSinceMarkedAsStale = timeSinceMarkedAsStale; + } + + long getTimeSinceMarkedAsStale() { + return timeSinceMarkedAsStale; + } } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ConnectionManager.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ConnectionManager.java index 18feaf8c90..9ea1d96166 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ConnectionManager.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ConnectionManager.java @@ -19,7 +19,17 @@ package io.ballerina.stdlib.http.transport.contractimpl.sender.http2; import io.ballerina.stdlib.http.transport.contractimpl.common.HttpRoute; +import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext; import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.PoolConfiguration; +import io.netty.channel.DefaultEventLoop; +import io.netty.util.concurrent.DefaultPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; /** * {@code Http2ConnectionManager} Manages HTTP/2 connections. @@ -27,10 +37,12 @@ public class Http2ConnectionManager { private final Http2ChannelPool http2ChannelPool = new Http2ChannelPool(); + private final BlockingQueue http2StaleClientChannels = new LinkedBlockingQueue<>(); private final PoolConfiguration poolConfiguration; public Http2ConnectionManager(PoolConfiguration poolConfiguration) { this.poolConfiguration = poolConfiguration; + initiateConnectionEvictionTask(); } /** @@ -110,15 +122,12 @@ private synchronized Http2ChannelPool.PerRouteConnectionPool createPerRouteConne * @param httpRoute the http route * @return Http2ClientChannel */ - public Http2ClientChannel borrowChannel(HttpRoute httpRoute) { + public Http2ClientChannel fetchChannel(HttpRoute httpRoute) { Http2ChannelPool.PerRouteConnectionPool perRouteConnectionPool; - perRouteConnectionPool = getOrCreatePerRoutePool(this.http2ChannelPool, generateKey(httpRoute)); - - Http2ClientChannel http2ClientChannel = null; - if (perRouteConnectionPool != null) { - http2ClientChannel = perRouteConnectionPool.fetchTargetChannel(); + synchronized (this) { + perRouteConnectionPool = getOrCreatePerRoutePool(this.http2ChannelPool, generateKey(httpRoute)); + return perRouteConnectionPool != null ? perRouteConnectionPool.fetchTargetChannel() : null; } - return http2ClientChannel; } /** @@ -147,6 +156,53 @@ void removeClientChannel(HttpRoute httpRoute, Http2ClientChannel http2ClientChan } } + void markClientChannelAsStale(HttpRoute httpRoute, Http2ClientChannel http2ClientChannel) { + Http2ChannelPool.PerRouteConnectionPool perRouteConnectionPool = fetchPerRoutePool(httpRoute); + if (perRouteConnectionPool != null) { + perRouteConnectionPool.removeChannel(http2ClientChannel); + } + http2ClientChannel.setTimeSinceMarkedAsStale(System.currentTimeMillis()); + http2StaleClientChannels.add(http2ClientChannel); + } + + private void initiateConnectionEvictionTask() { + Timer timer = new Timer(true); + TimerTask timerTask = new TimerTask() { + Logger logger = LoggerFactory.getLogger(this.getClass()); + @Override + public void run() { + http2StaleClientChannels.forEach(http2ClientChannel -> { + if (poolConfiguration.getMinIdleTimeInStaleState() == -1) { + if (!http2ClientChannel.hasInFlightMessages()) { + closeChannelAndEvict(http2ClientChannel); + } + } else if ((System.currentTimeMillis() - http2ClientChannel.getTimeSinceMarkedAsStale()) > + poolConfiguration.getMinIdleTimeInStaleState()) { + http2ClientChannel.getInFlightMessages().forEach((streamId, outboundMsgHolder) -> { + Http2MessageStateContext messageStateContext = + outboundMsgHolder.getRequest().getHttp2MessageStateContext(); + if (messageStateContext != null) { + messageStateContext.getSenderState().handleConnectionClose(outboundMsgHolder); + } + }); + closeChannelAndEvict(http2ClientChannel); + } + }); + } + + public void closeChannelAndEvict(Http2ClientChannel http2ClientChannel) { + boolean result = http2StaleClientChannels.remove(http2ClientChannel); + if (!result) { + logger.warn("Specified channel does not exist in the stale list."); + } + http2ClientChannel.getConnection() + .close(new DefaultPromise(new DefaultEventLoop())); + } + }; + timer.schedule(timerTask, poolConfiguration.getTimeBetweenStaleEviction(), + poolConfiguration.getTimeBetweenStaleEviction()); + } + private Http2ChannelPool.PerRouteConnectionPool fetchPerRoutePool(HttpRoute httpRoute) { String key = generateKey(httpRoute); return this.http2ChannelPool.fetchPerRoutePool(key); diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2TargetHandler.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2TargetHandler.java index 018a3805e9..b1031cc23c 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2TargetHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2TargetHandler.java @@ -265,8 +265,11 @@ private void onResetRead(Http2Reset http2Reset) { int streamId = http2Reset.getStreamId(); OutboundMsgHolder outboundMsgHolder = http2ClientChannel.getInFlightMessage(streamId); if (outboundMsgHolder != null) { - outboundMsgHolder.getResponseFuture() - .notifyHttpListener(new Exception("HTTP/2 stream " + streamId + " reset by the remote peer")); + Http2MessageStateContext messageStateContext = + outboundMsgHolder.getRequest().getHttp2MessageStateContext(); + if (messageStateContext != null) { + messageStateContext.getSenderState().handleRstStream(outboundMsgHolder); + } } } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/EntityBodyReceived.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/EntityBodyReceived.java index 2376d03276..932180bbc4 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/EntityBodyReceived.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/EntityBodyReceived.java @@ -118,4 +118,14 @@ public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { LOG.debug("Channel is closed"); } } + + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + LOG.warn("handleServerGoAway is not a dependant action of this state"); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + LOG.warn("handleRstStream is not a dependant action of this state"); + } } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingEntityBody.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingEntityBody.java index 2ee6af2ef8..ac059d85c7 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingEntityBody.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingEntityBody.java @@ -36,6 +36,8 @@ import org.slf4j.LoggerFactory; import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_BODY; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_BODY; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2StateUtil.releaseContent; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.StateUtil.handleIncompleteInboundMessage; @@ -117,6 +119,18 @@ public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY); } + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + handleIncompleteInboundMessage(outboundMsgHolder.getResponse(), + REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_BODY); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + handleIncompleteInboundMessage(outboundMsgHolder.getResponse(), + REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_BODY); + } + private void onDataRead(Http2DataFrame http2DataFrame, OutboundMsgHolder outboundMsgHolder, boolean serverPush, Http2MessageStateContext http2MessageStateContext) { int streamId = http2DataFrame.getStreamId(); diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingHeaders.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingHeaders.java index 0aa5bc74d3..6301d9b475 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingHeaders.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/ReceivingHeaders.java @@ -59,6 +59,8 @@ import static io.ballerina.stdlib.http.transport.contract.Constants.INBOUND_RESPONSE; import static io.ballerina.stdlib.http.transport.contract.Constants.POOLED_BYTE_BUFFER_FACTORY; import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_HEADERS; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_HEADERS; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_HEADERS; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2StateUtil.releaseContent; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.StateUtil.handleIncompleteInboundMessage; import static io.netty.handler.codec.http.HttpHeaderNames.TRAILER; @@ -141,6 +143,18 @@ public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_HEADERS); } + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + handleIncompleteInboundMessage(outboundMsgHolder.getResponse(), + REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_HEADERS); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + handleIncompleteInboundMessage(outboundMsgHolder.getResponse(), + REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_HEADERS); + } + private void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame http2HeadersFrame, OutboundMsgHolder outboundMsgHolder, boolean serverPush, Http2MessageStateContext http2MessageStateContext) { diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/RequestCompleted.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/RequestCompleted.java index 013644dc74..af2488572b 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/RequestCompleted.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/RequestCompleted.java @@ -36,6 +36,8 @@ import static io.ballerina.stdlib.http.transport.contract.Constants.IDLE_TIMEOUT_TRIGGERED_BEFORE_INITIATING_INBOUND_RESPONSE; import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_GOAWAY_BEFORE_INITIATING_INBOUND_RESPONSE; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_RST_STREAM_BEFORE_INITIATING_INBOUND_RESPONSE; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2StateUtil.onPushPromiseRead; /** @@ -112,4 +114,16 @@ public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { new ServerConnectorException(REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE)); LOG.error(REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE); } + + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getResponseFuture().notifyHttpListener( + new ServerConnectorException(REMOTE_SERVER_SENT_GOAWAY_BEFORE_INITIATING_INBOUND_RESPONSE)); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getResponseFuture().notifyHttpListener( + new ServerConnectorException(REMOTE_SERVER_SENT_RST_STREAM_BEFORE_INITIATING_INBOUND_RESPONSE)); + } } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SenderState.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SenderState.java index 2dbcf78b5b..2a18ce8390 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SenderState.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SenderState.java @@ -103,4 +103,14 @@ void handleStreamTimeout(OutboundMsgHolder outboundMsgHolder, boolean serverPush * Handles the connection close event. */ void handleConnectionClose(OutboundMsgHolder outboundMsgHolder); + + /** + * Handles the Stream close event due to receiving GoAway frame. + */ + void handleServerGoAway(OutboundMsgHolder outboundMsgHolder); + + /** + * Handles the Stream close event due to receiving RSTStream frame. + */ + void handleRstStream(OutboundMsgHolder outboundMsgHolder); } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingEntityBody.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingEntityBody.java index 83fd4c30ba..47de712cde 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingEntityBody.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingEntityBody.java @@ -19,6 +19,7 @@ package io.ballerina.stdlib.http.transport.contractimpl.sender.states.http2; import io.ballerina.stdlib.http.transport.contract.exceptions.EndpointTimeOutException; +import io.ballerina.stdlib.http.transport.contract.exceptions.RequestCancelledException; import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext; import io.ballerina.stdlib.http.transport.contractimpl.sender.http2.Http2ClientChannel; import io.ballerina.stdlib.http.transport.contractimpl.sender.http2.Http2DataEventListener; @@ -44,6 +45,8 @@ import static io.ballerina.stdlib.http.transport.contract.Constants.IDLE_TIMEOUT_TRIGGERED_WHILE_WRITING_OUTBOUND_REQUEST_BODY; import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_BODY; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_BODY; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_BODY; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2StateUtil.onPushPromiseRead; /** @@ -137,6 +140,24 @@ public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { HttpResponseStatus.GATEWAY_TIMEOUT.code())); } + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getRequest().setIoException( + new IOException(REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_BODY)); + outboundMsgHolder.getResponseFuture().notifyHttpListener(new RequestCancelledException( + REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_BODY, + HttpResponseStatus.BAD_GATEWAY.code())); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getRequest().setIoException( + new IOException(REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_BODY)); + outboundMsgHolder.getResponseFuture().notifyHttpListener(new RequestCancelledException( + REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_BODY, + HttpResponseStatus.BAD_GATEWAY.code())); + } + private void writeContent(ChannelHandlerContext ctx, HttpContent msg) throws Http2Exception { boolean release = true; try { diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingHeaders.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingHeaders.java index 49ef3470d2..7298c3e1db 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingHeaders.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingHeaders.java @@ -19,6 +19,7 @@ package io.ballerina.stdlib.http.transport.contractimpl.sender.states.http2; import io.ballerina.stdlib.http.transport.contract.exceptions.EndpointTimeOutException; +import io.ballerina.stdlib.http.transport.contract.exceptions.RequestCancelledException; import io.ballerina.stdlib.http.transport.contractimpl.common.Util; import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext; import io.ballerina.stdlib.http.transport.contractimpl.sender.http2.Http2ClientChannel; @@ -53,6 +54,8 @@ import static io.ballerina.stdlib.http.transport.contract.Constants.INBOUND_RESPONSE_ALREADY_RECEIVED; import static io.ballerina.stdlib.http.transport.contract.Constants.PROTOCOL; import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2StateUtil.initiateStream; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2StateUtil.writeHttp2Headers; @@ -140,6 +143,20 @@ public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { HttpResponseStatus.GATEWAY_TIMEOUT.code())); } + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getResponseFuture().notifyHttpListener(new RequestCancelledException( + REMOTE_SERVER_SENT_GOAWAY_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS, + HttpResponseStatus.BAD_GATEWAY.code())); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getResponseFuture().notifyHttpListener(new RequestCancelledException( + REMOTE_SERVER_SENT_RST_STREAM_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS, + HttpResponseStatus.BAD_GATEWAY.code())); + } + private void writeHeaders(ChannelHandlerContext ctx, HttpContent msg) throws Http2Exception { // Initiate the stream boolean endStream = false; diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingRstFrame.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingRstFrame.java index 789ea000d1..1808b0ab46 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingRstFrame.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/SendingRstFrame.java @@ -19,6 +19,8 @@ package io.ballerina.stdlib.http.transport.contractimpl.sender.states.http2; import io.ballerina.stdlib.http.transport.contract.exceptions.EndpointTimeOutException; +import io.ballerina.stdlib.http.transport.contract.exceptions.RequestCancelledException; +import io.ballerina.stdlib.http.transport.contract.exceptions.ServerConnectorException; import io.ballerina.stdlib.http.transport.contractimpl.common.states.Http2MessageStateContext; import io.ballerina.stdlib.http.transport.contractimpl.sender.http2.Http2ClientChannel; import io.ballerina.stdlib.http.transport.contractimpl.sender.http2.Http2TargetHandler; @@ -35,8 +37,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.ballerina.stdlib.http.transport.contract.Constants.IDLE_TIMEOUT_TRIGGERED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS; -import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS; +import static io.ballerina.stdlib.http.transport.contract.Constants.IDLE_TIMEOUT_TRIGGERED_WHILE_SENDING_RST_STREAM; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_SENDING_RST_STREAM; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_GOAWAY_WHILE_SENDING_RST_STREAM; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_RST_STREAM_WHILE_SENDING_RST_STREAM; /** * A state to reset the stream in the middle of communication. @@ -100,16 +104,29 @@ public void handleStreamTimeout(OutboundMsgHolder outboundMsgHolder, boolean ser int streamId) throws Http2Exception { if (!serverPush) { outboundMsgHolder.getResponseFuture().notifyHttpListener( - new EndpointTimeOutException(IDLE_TIMEOUT_TRIGGERED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS, + new EndpointTimeOutException(IDLE_TIMEOUT_TRIGGERED_WHILE_SENDING_RST_STREAM, HttpResponseStatus.GATEWAY_TIMEOUT.code())); } } @Override public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { - outboundMsgHolder.getResponseFuture().notifyHttpListener(new EndpointTimeOutException( - REMOTE_SERVER_CLOSED_WHILE_WRITING_OUTBOUND_REQUEST_HEADERS, - HttpResponseStatus.GATEWAY_TIMEOUT.code())); + outboundMsgHolder.getResponseFuture().notifyHttpListener(new ServerConnectorException( + REMOTE_SERVER_CLOSED_WHILE_SENDING_RST_STREAM)); + } + + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getResponseFuture().notifyHttpListener(new RequestCancelledException( + REMOTE_SERVER_SENT_GOAWAY_WHILE_SENDING_RST_STREAM, + HttpResponseStatus.BAD_GATEWAY.code())); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + outboundMsgHolder.getResponseFuture().notifyHttpListener(new RequestCancelledException( + REMOTE_SERVER_SENT_RST_STREAM_WHILE_SENDING_RST_STREAM, + HttpResponseStatus.BAD_GATEWAY.code())); } public void resetStream(ChannelHandlerContext ctx) { diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/WaitingFor100Continue.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/WaitingFor100Continue.java index 10715c495d..dbb26cfbb3 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/WaitingFor100Continue.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/states/http2/WaitingFor100Continue.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledFuture; import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_HEADERS; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_HEADERS; import static io.ballerina.stdlib.http.transport.contractimpl.common.states.StateUtil.handleIncompleteInboundMessage; /** @@ -151,4 +152,16 @@ public void handleConnectionClose(OutboundMsgHolder outboundMsgHolder) { handleIncompleteInboundMessage(outboundMsgHolder.getResponse(), REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_HEADERS); } + + @Override + public void handleServerGoAway(OutboundMsgHolder outboundMsgHolder) { + handleIncompleteInboundMessage(outboundMsgHolder.getResponse(), + REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_HEADERS); + } + + @Override + public void handleRstStream(OutboundMsgHolder outboundMsgHolder) { + handleIncompleteInboundMessage(outboundMsgHolder.getResponse(), + REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_HEADERS); + } } diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/TestExhaustedStreamIdForClient.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/TestExhaustedStreamIdForClient.java index 46bff8fdc7..fb62bf1ab4 100644 --- a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/TestExhaustedStreamIdForClient.java +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/TestExhaustedStreamIdForClient.java @@ -100,7 +100,7 @@ public void testExhaustedStreamId() throws Http2Exception { assertEquals(firstResult, testValue, "Expected response not received"); Http2ClientChannel http2ClientChannel = connectionManager.getHttp2ConnectionManager() - .borrowChannel(new HttpRoute(Constants.HTTP_SCHEME, LOCALHOST, + .fetchChannel(new HttpRoute(Constants.HTTP_SCHEME, LOCALHOST, HTTP_SERVER_PORT, 0)); //Simulate the stream id to have reached its max value for the connection. @@ -111,7 +111,7 @@ public void testExhaustedStreamId() throws Http2Exception { Throwable firstError = new MessageSender(httpClientConnector).sendMessageAndExpectError(secondMessage); assertNotNull(firstError, "Expected error not received"); assertEquals(firstError.getMessage(), "No more streams can be created on this connection", - "Expected error response not received"); + "Expected error response not received"); //Send another request using the same client and it should not fail HttpCarbonMessage thirdMessage = MessageGenerator.generateRequest(HttpMethod.POST, testValue); diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/clientchannelclose/RemoteChannelCloseWithSSLError.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/clientchannelclose/RemoteChannelCloseWithSSLError.java index 102b4ca711..70ddf3d5d7 100644 --- a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/clientchannelclose/RemoteChannelCloseWithSSLError.java +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/clientchannelclose/RemoteChannelCloseWithSSLError.java @@ -70,7 +70,7 @@ public void setup() throws InterruptedException { h2PriorOnClient = getTestHttp2Client(connectorFactory, true); } - //TODO:Change the assertion state once the issue https://githubcom/ballerina-platform/ballerina-lang/issues/17539 + //TODO:Change the assertion state once the issue https://github.com/ballerina-platform/ballerina-lang/issues/17539 // is fixed. @Test public void testRemoteChannelClose() { diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/FrameLevelTestUtils.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/FrameLevelTestUtils.java new file mode 100644 index 0000000000..a3c9dfa072 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/FrameLevelTestUtils.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.contract.HttpWsConnectorFactory; +import io.ballerina.stdlib.http.transport.contract.config.SenderConfiguration; +import io.ballerina.stdlib.http.transport.contract.config.TransportsConfiguration; +import io.ballerina.stdlib.http.transport.contractimpl.DefaultHttpWsConnectorFactory; +import io.ballerina.stdlib.http.transport.message.HttpConnectorUtil; + +/** + * This contains the utils required for frame level tests. + */ +public class FrameLevelTestUtils { + + public static final int SLEEP_TIME = 100; + public static final int END_SLEEP_TIME = 1000; + + public static final byte[] SETTINGS_FRAME = new byte[]{0x00, 0x00, 0x06, 0x04, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x64, 0x64}; + public static final byte[] SETTINGS_FRAME_WITH_ACK = + new byte[]{0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00}; + public static final byte[] HEADER_FRAME_STREAM_03 = new byte[]{0x00, 0x00, 0x0a, 0x01, 0x04, 0x00, 0x00, 0x00, 0x03, + (byte) 0x88, 0x5f, (byte) 0x87, 0x49, 0x7c, (byte) 0xa5, (byte) 0x8a, (byte) 0xe8, 0x19, (byte) 0xaa}; + public static final byte[] HEADER_FRAME_STREAM_05 = new byte[]{0x00, 0x00, 0x0a, 0x01, 0x04, 0x00, 0x00, 0x00, 0x05, + (byte) 0x88, 0x5f, (byte) 0x87, 0x49, 0x7c, (byte) 0xa5, (byte) 0x8a, (byte) 0xe8, 0x19, (byte) 0xaa}; + public static final byte[] HEADER_FRAME_STREAM_07 = new byte[]{0x00, 0x00, 0x0a, 0x01, 0x04, 0x00, 0x00, 0x00, 0x07, + (byte) 0x88, 0x5f, (byte) 0x87, 0x49, 0x7c, (byte) 0xa5, (byte) 0x8a, (byte) 0xe8, 0x19, (byte) 0xaa}; + public static final byte[] HEADER_FRAME_STREAM_09 = new byte[]{0x00, 0x00, 0x0a, 0x01, 0x04, 0x00, 0x00, 0x00, 0x09, + (byte) 0x88, 0x5f, (byte) 0x87, 0x49, 0x7c, (byte) 0xa5, (byte) 0x8a, (byte) 0xe8, 0x19, (byte) 0xaa}; + public static final byte[] DATA_FRAME_STREAM_03 = new byte[]{0x00, 0x00, 0x0c, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x33}; + public static final byte[] DATA_FRAME_STREAM_03_DIFFERENT_DATA = new byte[]{0x00, 0x00, 0x0c, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x34}; + public static final byte[] DATA_FRAME_STREAM_05 = new byte[]{0x00, 0x00, 0x0c, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x35}; + public static final byte[] DATA_FRAME_STREAM_07 = new byte[]{0x00, 0x00, 0x0c, 0x00, 0x01, 0x00, 0x00, 0x00, 0x07, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x37}; + public static final byte[] DATA_FRAME_STREAM_09 = new byte[]{0x00, 0x00, 0x0c, 0x00, 0x01, 0x00, 0x00, 0x00, 0x09, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x39}; + public static final byte[] GO_AWAY_FRAME_MAX_STREAM_01 = new byte[]{0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0b}; + public static final byte[] GO_AWAY_FRAME_MAX_STREAM_03 = new byte[]{0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x0b}; + public static final byte[] GO_AWAY_FRAME_MAX_STREAM_05 = new byte[]{0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x0b}; + public static final byte[] GO_AWAY_FRAME_MAX_STREAM_07 = new byte[]{0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x0b}; + public static final byte[] RST_STREAM_FRAME_STREAM_03 = new byte[]{0x00, 0x00, 0x04, 0x03, 0x00, 0x00, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x00, 0x02}; + public static final byte[] RST_STREAM_FRAME_STREAM_07 = new byte[]{0x00, 0x00, 0x04, 0x03, 0x00, 0x00, 0x00, + 0x00, 0x07, 0x00, 0x00, 0x00, 0x02}; + public static final String DATA_VALUE_HELLO_WORLD_03 = "hello world3"; + public static final String DATA_VALUE_HELLO_WORLD_04 = "hello world4"; + public static final String DATA_VALUE_HELLO_WORLD_05 = "hello world5"; + public static final String DATA_VALUE_HELLO_WORLD_07 = "hello world7"; + + public static HttpClientConnector setupHttp2PriorKnowledgeClient() { + HttpWsConnectorFactory connectorFactory = new DefaultHttpWsConnectorFactory(); + TransportsConfiguration transportsConfiguration = new TransportsConfiguration(); + SenderConfiguration senderConfiguration = new SenderConfiguration(); + senderConfiguration.setScheme(Constants.HTTP_SCHEME); + senderConfiguration.setHttpVersion(Constants.HTTP_2_0); + senderConfiguration.setForceHttp2(true); + return connectorFactory.createHttpClientConnector( + HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2ConnectionEvictionAfterTcpServerGoAwayScenarioTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2ConnectionEvictionAfterTcpServerGoAwayScenarioTest.java new file mode 100644 index 0000000000..a2a1ef1416 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2ConnectionEvictionAfterTcpServerGoAwayScenarioTest.java @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.contract.HttpWsConnectorFactory; +import io.ballerina.stdlib.http.transport.contract.config.SenderConfiguration; +import io.ballerina.stdlib.http.transport.contract.config.TransportsConfiguration; +import io.ballerina.stdlib.http.transport.contractimpl.DefaultHttpWsConnectorFactory; +import io.ballerina.stdlib.http.transport.contractimpl.sender.channel.pool.PoolConfiguration; +import io.ballerina.stdlib.http.transport.message.HttpCarbonMessage; +import io.ballerina.stdlib.http.transport.message.HttpConnectorUtil; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import io.netty.util.CharsetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE; +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests + .FrameLevelTestUtils.DATA_FRAME_STREAM_03_DIFFERENT_DATA; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_04; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getDecoderErrorMessage; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getErrorResponseMessage; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getResponseMessage; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertEqualsNoOrder; +import static org.testng.Assert.fail; + +/** + * This contains a test case where the tcp server sends a GoAway and the connection gets timed out from client side. + */ +public class Http2ConnectionEvictionAfterTcpServerGoAwayScenarioTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(Http2ConnectionEvictionAfterTcpServerGoAwayScenarioTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + private int numOfConnections = 0; + + public HttpClientConnector setupHttp2PriorKnowledgeClient(long minIdleTimeInStaleState, + long timeBetweenStaleEviction) { + HttpWsConnectorFactory connectorFactory = new DefaultHttpWsConnectorFactory(); + PoolConfiguration poolConfiguration = new PoolConfiguration(); + poolConfiguration.setMinIdleTimeInStaleState(minIdleTimeInStaleState); + poolConfiguration.setTimeBetweenStaleEviction(timeBetweenStaleEviction); + TransportsConfiguration transportsConfiguration = new TransportsConfiguration(); + SenderConfiguration senderConfiguration = new SenderConfiguration(); + senderConfiguration.setPoolConfiguration(poolConfiguration); + senderConfiguration.setScheme(Constants.HTTP_SCHEME); + senderConfiguration.setHttpVersion(Constants.HTTP_2_0); + senderConfiguration.setForceHttp2(true); + return connectorFactory.createHttpClientConnector( + HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration); + } + + @Test + private void testConnectionEvictionAfterAllStreamsAreClosedScenario() { + try { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + // Setting to -1 will make the runner to wait until all pending streams are completed + h2ClientWithPriorKnowledge = setupHttp2PriorKnowledgeClient(-1, 1000); + CountDownLatch latch1 = new CountDownLatch(2); + DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge); + DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge); + latch1.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + + CountDownLatch latch2 = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(latch2, h2ClientWithPriorKnowledge); + latch2.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + + HttpCarbonMessage response1 = msgListener1.getHttpResponseMessage(); + HttpCarbonMessage response2 = msgListener2.getHttpResponseMessage(); + HttpCarbonMessage response3 = msgListener3.getHttpResponseMessage(); + + Object responseVal1 = response1.getHttpContent().content().toString(CharsetUtil.UTF_8); + Object responseVal2 = response2.getHttpContent().content().toString(CharsetUtil.UTF_8); + Object responseVal3 = response3.getHttpContent().content().toString(CharsetUtil.UTF_8); + + assertEqualsNoOrder(List.of(responseVal1, responseVal2), List.of(DATA_VALUE_HELLO_WORLD_03, + DATA_VALUE_HELLO_WORLD_05)); + assertEquals(responseVal3, DATA_VALUE_HELLO_WORLD_04); + } catch (InterruptedException | IOException e) { + LOGGER.error("Exception occurred"); + fail(); + } + } + + @Test + private void testConnectionEvictionBeforeAllStreamsAreClosedScenario() { + try { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + // Setting to -1 will make the runner to wait until all pending streams are completed + h2ClientWithPriorKnowledge = setupHttp2PriorKnowledgeClient(5000, 1000); + CountDownLatch latch1 = new CountDownLatch(2); + DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge); + DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(latch1, h2ClientWithPriorKnowledge); + latch1.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + + CountDownLatch latch2 = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(latch2, h2ClientWithPriorKnowledge); + latch2.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + + String errorMsg1 = msgListener1.getHttpErrorMessage() != null ? getErrorResponseMessage(msgListener1) : + getDecoderErrorMessage(msgListener1); + String errorMsg2 = msgListener2.getHttpErrorMessage() != null ? getErrorResponseMessage(msgListener2) : + getDecoderErrorMessage(msgListener2); + + assertEqualsNoOrder(List.of(errorMsg1, errorMsg2), + List.of(REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE, + REMOTE_SERVER_CLOSED_WHILE_READING_INBOUND_RESPONSE_BODY)); + assertEquals(getResponseMessage(msgListener3), DATA_VALUE_HELLO_WORLD_04); + } catch (InterruptedException | IOException e) { + LOGGER.error("Exception occurred"); + fail(); + } + } + + private void runTcpServer(int port) throws IOException { + if (serverSocket != null) { + serverSocket.close(); + } + numOfConnections = 0; + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + while (numOfConnections < 2) { + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + if (numOfConnections == 0) { + sendGoAwayForASingleStream(outputStream); + } else { + // If the connection successfully closed, a new socket connection + // will be opened and it will come here + sendSuccessfulRequest(outputStream); + } + numOfConnections += 1; + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendGoAwayForASingleStream(OutputStream outputStream) throws IOException, InterruptedException { + outputStream.write(SETTINGS_FRAME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + // This will move the connection to the stale connections list + outputStream.write(GO_AWAY_FRAME_MAX_STREAM_05); + // Sleeping for 8 seconds and the timer task will check whether there are inflight message still + // remaining in the channel. + Thread.sleep(8000); + outputStream.write(DATA_FRAME_STREAM_03); + outputStream.write(HEADER_FRAME_STREAM_05); + Thread.sleep(SLEEP_TIME); + outputStream.write(DATA_FRAME_STREAM_05); + // Once all the inflight messages are completed, the connection will be closed. + Thread.sleep(8000); + } + + private void sendSuccessfulRequest(OutputStream outputStream) throws IOException, InterruptedException { + outputStream.write(SETTINGS_FRAME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(DATA_FRAME_STREAM_03_DIFFERENT_DATA); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterClass + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2ServerAbruptClosureInUpgradeScenarioTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2ServerAbruptClosureInUpgradeScenarioTest.java new file mode 100644 index 0000000000..3d73b2d0c2 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2ServerAbruptClosureInUpgradeScenarioTest.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.contract.HttpWsConnectorFactory; +import io.ballerina.stdlib.http.transport.contract.config.SenderConfiguration; +import io.ballerina.stdlib.http.transport.contract.config.TransportsConfiguration; +import io.ballerina.stdlib.http.transport.contractimpl.DefaultHttpWsConnectorFactory; +import io.ballerina.stdlib.http.transport.message.HttpConnectorUtil; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.contract.Constants.REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getErrorResponseMessage; +import static org.testng.Assert.fail; +import static org.testng.AssertJUnit.assertEquals; + +/** + * This contains a test case where the tcp server sends a GoAway and the connection gets timed out from client side. + */ +public class Http2ServerAbruptClosureInUpgradeScenarioTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(Http2ServerAbruptClosureInUpgradeScenarioTest.class); + private HttpClientConnector h2ClientWithUpgrade; + private ServerSocket serverSocket; + private int numOfConnections = 0; + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithUpgrade = setupHttp2UpgradeClient(); + } + + public HttpClientConnector setupHttp2UpgradeClient() { + HttpWsConnectorFactory connectorFactory = new DefaultHttpWsConnectorFactory(); + TransportsConfiguration transportsConfiguration = new TransportsConfiguration(); + SenderConfiguration senderConfiguration = new SenderConfiguration(); + senderConfiguration.setScheme(Constants.HTTP_SCHEME); + senderConfiguration.setHttpVersion(Constants.HTTP_2_0); + senderConfiguration.setForceHttp2(false); + return connectorFactory.createHttpClientConnector( + HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration); + } + + @Test + private void testServerAbruptClosureInUpgradeScenario() { + try { + CountDownLatch latch1 = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(latch1, h2ClientWithUpgrade); + latch1.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + + CountDownLatch latch2 = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(latch2, h2ClientWithUpgrade); + latch2.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + + assertEquals(getErrorResponseMessage(msgListener1), + REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE); + assertEquals(getErrorResponseMessage(msgListener2), + REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + fail(); + } + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + while (numOfConnections < 2) { + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream(); + InputStream inputStream = clientSocket.getInputStream()) { + readSocketAndExit(inputStream); + numOfConnections += 1; + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void readSocketAndExit(InputStream inputStream) throws IOException { + // This will just read the socket input content and exit the socket without sending any response + // which will trigger the channel inactive in the client side + byte[] buffer = new byte[4096]; + int bytesRead = 0; + try { + bytesRead = inputStream.read(buffer); + } catch (Exception exception) { + LOGGER.error(exception.getMessage()); + } + String data = new String(buffer, 0, bytesRead); + LOGGER.info("Received upgrade requesst: " + data); + } + + @AfterClass + public void cleanUp() throws IOException { + h2ClientWithUpgrade.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwayMultipleStreamScenarioTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwayMultipleStreamScenarioTest.java new file mode 100644 index 0000000000..47f6252a3b --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwayMultipleStreamScenarioTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_07; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_09; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_07; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_07; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_07; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_09; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getErrorResponseMessage; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getResponseMessage; +import static org.testng.Assert.assertEqualsNoOrder; + +/** + * This contains a test case where the tcp server sends a GoAway for a stream in a multiple stream scenario. + */ +public class Http2TcpServerGoAwayMultipleStreamScenarioTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(Http2TcpServerGoAwayMultipleStreamScenarioTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test + private void testGoAwayWhenReceivingHeadersInAMultipleStreamScenario() { + CountDownLatch latch = new CountDownLatch(4); + DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + DefaultHttpConnectorListener msgListener4 = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + try { + latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + } + + Object responseValOrError1 = msgListener1.getHttpResponseMessage() == null ? + getErrorResponseMessage(msgListener1) : getResponseMessage(msgListener1); + Object responseValOrError2 = msgListener2.getHttpResponseMessage() == null ? + getErrorResponseMessage(msgListener2) : getResponseMessage(msgListener2); + Object responseValOrError3 = msgListener3.getHttpResponseMessage() == null ? + getErrorResponseMessage(msgListener3) : getResponseMessage(msgListener3); + Object responseValOrError4 = msgListener4.getHttpResponseMessage() == null ? + getErrorResponseMessage(msgListener4) : getResponseMessage(msgListener4); + assertEqualsNoOrder(List.of(responseValOrError1, responseValOrError2, responseValOrError3, + responseValOrError4), List.of(DATA_VALUE_HELLO_WORLD_03, DATA_VALUE_HELLO_WORLD_05, + DATA_VALUE_HELLO_WORLD_07, Constants.REMOTE_SERVER_SENT_GOAWAY_BEFORE_INITIATING_INBOUND_RESPONSE)); + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + sendGoAwayForASingleStreamInAMultipleStreamScenario(outputStream); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendGoAwayForASingleStreamInAMultipleStreamScenario(OutputStream outputStream) + throws IOException, InterruptedException { + // Sending settings frame with HEADER_TABLE_SIZE=25700 + outputStream.write(SETTINGS_FRAME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_05); + outputStream.write(DATA_FRAME_STREAM_05); + outputStream.write(GO_AWAY_FRAME_MAX_STREAM_07); + Thread.sleep(SLEEP_TIME); + // Sending the frames for higher streams to check whether client correctly ignores them. + outputStream.write(HEADER_FRAME_STREAM_09); + outputStream.write(DATA_FRAME_STREAM_09); + Thread.sleep(SLEEP_TIME); + // Sending the frames for lower streams to check whether client correctly accepts them. + outputStream.write(HEADER_FRAME_STREAM_07); + outputStream.write(DATA_FRAME_STREAM_07); + Thread.sleep(SLEEP_TIME); + outputStream.write(DATA_FRAME_STREAM_03); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterClass + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwaySingleStreamScenarioTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwaySingleStreamScenarioTest.java new file mode 100644 index 0000000000..501b6e13fd --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwaySingleStreamScenarioTest.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getErrorResponseMessage; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getResponseMessage; +import static org.testng.Assert.assertEquals; + +/** + * This contains a test case where the tcp server sends a GoAway for a single request. + */ +public class Http2TcpServerGoAwaySingleStreamScenarioTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(Http2TcpServerGoAwaySingleStreamScenarioTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + + @BeforeMethod + public void setup(Method method) throws InterruptedException { + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test (description = "In this, server sends headers and data for the accepted stream") + private void testGoAwayWhenReceivingHeadersForASingleStream() { + runTcpServer(TestUtil.HTTP_SERVER_PORT, 1); + try { + CountDownLatch latch = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + assertEquals(getResponseMessage(msgListener), DATA_VALUE_HELLO_WORLD_03); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + } + } + + @Test (description = "In this, server exits without sending the headers and data for the accepted stream as well") + private void testGoAwayAndServerExitWhenReceivingHeadersForASingleStream() { + runTcpServer(TestUtil.HTTP_SERVER_PORT, 2); + CountDownLatch latch = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + try { + latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + } + assertEquals(getErrorResponseMessage(msgListener), + Constants.REMOTE_SERVER_CLOSED_BEFORE_INITIATING_INBOUND_RESPONSE); + } + + private void runTcpServer(int port, int option) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + if (option == 1) { + sendGoAwayForASingleStream(outputStream); + } else { + sendGoAwayAndExitForASingleStream(outputStream); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendGoAwayAndExitForASingleStream(OutputStream outputStream) + throws IOException, InterruptedException { + // Sending settings frame with HEADER_TABLE_SIZE=25700 + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(GO_AWAY_FRAME_MAX_STREAM_03); + // Once the sleep time elapses, channel inactive of client gets fired. + Thread.sleep(SLEEP_TIME); + } + + private void sendGoAwayForASingleStream(OutputStream outputStream) throws IOException, InterruptedException { + // Sending settings frame with HEADER_TABLE_SIZE=25700 + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(GO_AWAY_FRAME_MAX_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(DATA_FRAME_STREAM_03); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterMethod + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwayWhileReceivingBodyScenarioTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwayWhileReceivingBodyScenarioTest.java new file mode 100644 index 0000000000..06312df11c --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerGoAwayWhileReceivingBodyScenarioTest.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_01; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getDecoderErrorMessage; +import static org.testng.Assert.assertEquals; + +/** + * This contains a test case where the tcp server sends a GoAway while client receives the body. + */ +public class Http2TcpServerGoAwayWhileReceivingBodyScenarioTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(Http2TcpServerGoAwayWhileReceivingBodyScenarioTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test + private void testGoAwayWhenReceivingBodyForASingleStream() { + CountDownLatch latch = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + try { + latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + } + assertEquals(getDecoderErrorMessage(msgListener), + Constants.REMOTE_SERVER_SENT_GOAWAY_WHILE_READING_INBOUND_RESPONSE_BODY); + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + sendGoAwayAfterSendingHeadersForASingleStream(outputStream); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendGoAwayAfterSendingHeadersForASingleStream(OutputStream outputStream) + throws IOException, InterruptedException { + // Sending settings frame with HEADER_TABLE_SIZE=25700 + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(GO_AWAY_FRAME_MAX_STREAM_01); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterMethod + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameForMultipleStreamsTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameForMultipleStreamsTest.java new file mode 100644 index 0000000000..224d58c505 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameForMultipleStreamsTest.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.Semaphore; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_05; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_07; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.RST_STREAM_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.RST_STREAM_FRAME_STREAM_07; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getDecoderErrorMessage; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getErrorResponseMessage; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getResponseMessage; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +/** + * This contains a test case where the tcp server sends a successful response. + */ +public class Http2TcpServerRSTStreamFrameForMultipleStreamsTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(Http2TcpServerRSTStreamFrameForMultipleStreamsTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + Semaphore readSemaphore = new Semaphore(0); + Semaphore writeSemaphore = new Semaphore(0); + private ServerSocket serverSocket; + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test + private void testRSTStreamFrameForMultipleStreams() { + try { + DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + writeSemaphore.release(); + readSemaphore.acquire(); + DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + writeSemaphore.release(); + readSemaphore.acquire(); + DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + writeSemaphore.release(); + readSemaphore.acquire(); + assertEquals(getErrorResponseMessage(msgListener1), + Constants.REMOTE_SERVER_SENT_RST_STREAM_BEFORE_INITIATING_INBOUND_RESPONSE); + assertEquals(getResponseMessage(msgListener2), DATA_VALUE_HELLO_WORLD_05); + assertEquals(getDecoderErrorMessage(msgListener3), + Constants.REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_BODY); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + fail(); + } + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + sendRSTStream(outputStream); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } finally { + readSemaphore.release(); + writeSemaphore.release(); + } + }).start(); + } + + // This will send an RST_STREAM frame for stream 3 before sending headers and a successful response for + // stream 5 and an RST_STREAM frame for stream 7 after sending headers + private void sendRSTStream(OutputStream outputStream) throws IOException, InterruptedException { + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + writeSemaphore.acquire(); + outputStream.write(RST_STREAM_FRAME_STREAM_03); + readSemaphore.release(); + Thread.sleep(SLEEP_TIME); + writeSemaphore.acquire(); + outputStream.write(HEADER_FRAME_STREAM_05); + outputStream.write(DATA_FRAME_STREAM_05); + Thread.sleep(SLEEP_TIME); + readSemaphore.release(); + writeSemaphore.acquire(); + outputStream.write(HEADER_FRAME_STREAM_07); + Thread.sleep(SLEEP_TIME); + outputStream.write(RST_STREAM_FRAME_STREAM_07); + Thread.sleep(SLEEP_TIME); + readSemaphore.release(); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterMethod + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameForSingleStreamTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameForSingleStreamTest.java new file mode 100644 index 0000000000..785cbbf535 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameForSingleStreamTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.RST_STREAM_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getErrorResponseMessage; +import static org.testng.Assert.assertEquals; + +/** + * This contains a test case where the tcp server sends a successful response. + */ +public class Http2TcpServerRSTStreamFrameForSingleStreamTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(Http2TcpServerRSTStreamFrameForSingleStreamTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test + private void testRSTStreamFrameForSingleStream() { + CountDownLatch latch = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + try { + latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + } + assertEquals(getErrorResponseMessage(msgListener), + Constants.REMOTE_SERVER_SENT_RST_STREAM_BEFORE_INITIATING_INBOUND_RESPONSE); + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + sendRSTStream(outputStream); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendRSTStream(OutputStream outputStream) throws IOException, InterruptedException { + // Sending settings frame with HEADER_TABLE_SIZE=25700 + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(RST_STREAM_FRAME_STREAM_03); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterMethod + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameWhenReadingBodyForSingleStreamTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameWhenReadingBodyForSingleStreamTest.java new file mode 100644 index 0000000000..b57daff8e9 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerRSTStreamFrameWhenReadingBodyForSingleStreamTest.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.Constants; +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.RST_STREAM_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getDecoderErrorMessage; +import static org.testng.Assert.assertEquals; + +/** + * This contains a test case where the tcp server sends a successful response. + */ +public class Http2TcpServerRSTStreamFrameWhenReadingBodyForSingleStreamTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(Http2TcpServerRSTStreamFrameWhenReadingBodyForSingleStreamTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test + private void testRSTStreamFrameWhenReadingBodyForSingleStream() { + CountDownLatch latch = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + try { + latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + } + assertEquals(getDecoderErrorMessage(msgListener), + Constants.REMOTE_SERVER_SENT_RST_STREAM_WHILE_READING_INBOUND_RESPONSE_BODY); + + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + sendRSTStream(outputStream); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendRSTStream(OutputStream outputStream) throws IOException, InterruptedException { + // Sending settings frame with HEADER_TABLE_SIZE=25700 + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(RST_STREAM_FRAME_STREAM_03); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterMethod + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerSendGoAwayForAllStreamsScenarioTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerSendGoAwayForAllStreamsScenarioTest.java new file mode 100644 index 0000000000..d6f4b07c24 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerSendGoAwayForAllStreamsScenarioTest.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.Semaphore; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils + .DATA_FRAME_STREAM_03_DIFFERENT_DATA; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_04; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.GO_AWAY_FRAME_MAX_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getResponseMessage; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +/** + * This contains a test case where the client sends a request after receiving a GoAway. + * This tests whether there is a new connection opened from the client. + */ +public class Http2TcpServerSendGoAwayForAllStreamsScenarioTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(Http2TcpServerSendGoAwayForAllStreamsScenarioTest.class); + + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + Semaphore semaphore = new Semaphore(0); + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test + private void testGoAwayForAllStreamsScenario() { + try { + DefaultHttpConnectorListener msgListener1 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + semaphore.acquire(); + DefaultHttpConnectorListener msgListener2 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + semaphore.acquire(); + DefaultHttpConnectorListener msgListener3 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + semaphore.acquire(); + DefaultHttpConnectorListener msgListener4 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + semaphore.acquire(); + DefaultHttpConnectorListener msgListener5 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + semaphore.acquire(); + DefaultHttpConnectorListener msgListener6 = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge); + semaphore.acquire(); + assertEquals(getResponseMessage(msgListener1), DATA_VALUE_HELLO_WORLD_03); + assertEquals(getResponseMessage(msgListener2), DATA_VALUE_HELLO_WORLD_04); + assertEquals(getResponseMessage(msgListener3), DATA_VALUE_HELLO_WORLD_03); + assertEquals(getResponseMessage(msgListener4), DATA_VALUE_HELLO_WORLD_04); + assertEquals(getResponseMessage(msgListener5), DATA_VALUE_HELLO_WORLD_03); + assertEquals(getResponseMessage(msgListener6), DATA_VALUE_HELLO_WORLD_04); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + fail(); + } + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + int numberOfConnections = 0; + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + while (numberOfConnections < 6) { + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + if (numberOfConnections % 2 == 0) { + sendGoAwayBeforeSendingHeaders(outputStream); + } else { + sendGoAwayAfterSendingHeaders(outputStream); + } + numberOfConnections += 1; + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendGoAwayBeforeSendingHeaders(OutputStream outputStream) throws IOException, InterruptedException { + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(GO_AWAY_FRAME_MAX_STREAM_03); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(DATA_FRAME_STREAM_03); + semaphore.release(); + Thread.sleep(END_SLEEP_TIME); + } + + private void sendGoAwayAfterSendingHeaders(OutputStream outputStream) throws IOException, InterruptedException { + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + outputStream.write(GO_AWAY_FRAME_MAX_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(DATA_FRAME_STREAM_03_DIFFERENT_DATA); + semaphore.release(); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterMethod + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerSuccessScenarioTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerSuccessScenarioTest.java new file mode 100644 index 0000000000..b240c77eb9 --- /dev/null +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/http2/frameleveltests/Http2TcpServerSuccessScenarioTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.http.transport.http2.frameleveltests; + +import io.ballerina.stdlib.http.transport.contract.HttpClientConnector; +import io.ballerina.stdlib.http.transport.util.DefaultHttpConnectorListener; +import io.ballerina.stdlib.http.transport.util.TestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.DATA_VALUE_HELLO_WORLD_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.END_SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.HEADER_FRAME_STREAM_03; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SETTINGS_FRAME_WITH_ACK; +import static io.ballerina.stdlib.http.transport.http2.frameleveltests.FrameLevelTestUtils.SLEEP_TIME; +import static io.ballerina.stdlib.http.transport.util.TestUtil.getResponseMessage; +import static org.testng.Assert.assertEquals; + +/** + * This contains a test case where the tcp server sends a successful response. + */ +public class Http2TcpServerSuccessScenarioTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(Http2TcpServerSuccessScenarioTest.class); + private HttpClientConnector h2ClientWithPriorKnowledge; + private ServerSocket serverSocket; + + @BeforeClass + public void setup() throws InterruptedException { + runTcpServer(TestUtil.HTTP_SERVER_PORT); + h2ClientWithPriorKnowledge = FrameLevelTestUtils.setupHttp2PriorKnowledgeClient(); + } + + @Test + private void testSuccessfulConnection() { + CountDownLatch latch = new CountDownLatch(1); + DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge); + try { + latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception occurred"); + } + assertEquals(getResponseMessage(msgListener), DATA_VALUE_HELLO_WORLD_03); + } + + private void runTcpServer(int port) { + new Thread(() -> { + try { + serverSocket = new ServerSocket(port); + LOGGER.info("HTTP/2 TCP Server listening on port " + port); + Socket clientSocket = serverSocket.accept(); + LOGGER.info("Accepted connection from: " + clientSocket.getInetAddress()); + try (OutputStream outputStream = clientSocket.getOutputStream()) { + sendSuccessfulResponse(outputStream); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + }).start(); + } + + private void sendSuccessfulResponse(OutputStream outputStream) throws IOException, InterruptedException { + // Sending settings frame with HEADER_TABLE_SIZE=25700 + outputStream.write(SETTINGS_FRAME); + Thread.sleep(SLEEP_TIME); + outputStream.write(SETTINGS_FRAME_WITH_ACK); + Thread.sleep(SLEEP_TIME); + outputStream.write(HEADER_FRAME_STREAM_03); + Thread.sleep(SLEEP_TIME); + outputStream.write(DATA_FRAME_STREAM_03); + Thread.sleep(END_SLEEP_TIME); + } + + @AfterMethod + public void cleanUp() throws IOException { + h2ClientWithPriorKnowledge.close(); + serverSocket.close(); + } +} diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/util/DefaultHttpConnectorListener.java b/native/src/test/java/io/ballerina/stdlib/http/transport/util/DefaultHttpConnectorListener.java index 26034afde7..78a3958e83 100644 --- a/native/src/test/java/io/ballerina/stdlib/http/transport/util/DefaultHttpConnectorListener.java +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/util/DefaultHttpConnectorListener.java @@ -36,16 +36,23 @@ public DefaultHttpConnectorListener(CountDownLatch latch) { this.latch = latch; } + // This constructor can be used to create a listener without a latch when an external locking mechanism is in place. + public DefaultHttpConnectorListener() {} + @Override public void onMessage(HttpCarbonMessage httpMessage) { this.httpMessage = httpMessage; - latch.countDown(); + if (latch != null) { + latch.countDown(); + } } @Override public void onError(Throwable throwable) { this.throwable = throwable; - latch.countDown(); + if (latch != null) { + latch.countDown(); + } } public HttpCarbonMessage getHttpResponseMessage() { diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/util/TestUtil.java b/native/src/test/java/io/ballerina/stdlib/http/transport/util/TestUtil.java index 66f2f8057e..16db2f00f2 100644 --- a/native/src/test/java/io/ballerina/stdlib/http/transport/util/TestUtil.java +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/util/TestUtil.java @@ -37,8 +37,10 @@ import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -265,7 +267,8 @@ public static DefaultHttpConnectorListener sendRequestAsync(CountDownLatch latch HttpClientConnector httpClientConnector) { HttpCarbonMessage httpsPostReq = TestUtil. createHttpsPostReq(TestUtil.HTTP_SERVER_PORT, "hello", "/"); - DefaultHttpConnectorListener requestListener = new DefaultHttpConnectorListener(latch); + DefaultHttpConnectorListener requestListener = latch == null ? new DefaultHttpConnectorListener() : + new DefaultHttpConnectorListener(latch); HttpResponseFuture responseFuture = httpClientConnector.send(httpsPostReq); responseFuture.setHttpConnectorListener(requestListener); return requestListener; @@ -281,6 +284,20 @@ public static DefaultHttpConnectorListener sendRequestAsyncWithGivenPort(CountDo return requestListener; } + public static String getResponseMessage(DefaultHttpConnectorListener msgListener) { + HttpCarbonMessage response = msgListener.getHttpResponseMessage(); + return response.getHttpContent().content().toString(CharsetUtil.UTF_8); + } + + public static String getErrorResponseMessage(DefaultHttpConnectorListener msgListener) { + return msgListener.getHttpErrorMessage().getMessage(); + } + + public static String getDecoderErrorMessage(DefaultHttpConnectorListener msgListener) { + HttpContent content = msgListener.getHttpResponseMessage().getHttpContent(); + return content.decoderResult().cause().getMessage(); + } + public static void cleanUp(List serverConnectors, HttpServer httpServer) { for (ServerConnector httpServerConnector : serverConnectors) { if (!httpServerConnector.stop()) { diff --git a/native/src/test/resources/testng.xml b/native/src/test/resources/testng.xml index 6f7dbcd813..7944f41b74 100644 --- a/native/src/test/resources/testng.xml +++ b/native/src/test/resources/testng.xml @@ -154,6 +154,16 @@ + + + + + + + + + +