From f6d534108bbd49b6fa6a4fe1dade7b95ab7f0662 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Sun, 7 May 2023 15:19:20 +0530 Subject: [PATCH 1/8] Fix Passthrough Threads getting stucked due to request message discard Remove discard request message in ClientWorker since in httpcore-nio already clears the inPlain buffer. Also set suspendInput state on connection so that when inputReady state hits it won't go for handleInvalidState. Fixes: https://github.com/wso2/api-manager/issues/1792 --- .../synapse/transport/passthru/ClientWorker.java | 4 ---- .../synapse/transport/passthru/SourceContext.java | 11 +++++++++++ .../synapse/transport/passthru/SourceHandler.java | 8 +++++++- .../synapse/transport/passthru/TargetHandler.java | 6 ++++++ 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java index 3a2d7875d0..1f02fe907a 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java @@ -223,10 +223,6 @@ public void run() { getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis()); } try { - // If an error has happened in the request processing, consumes the data in pipe completely and discard it - if (response.isForceShutdownConnectionOnComplete()) { - RelayUtils.discardRequestMessage(requestMessageContext); - } if (expectEntityBody) { String cType = response.getHeader(HTTP.CONTENT_TYPE); diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java index 39fa57710d..8c20df7786 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java @@ -40,6 +40,8 @@ public class SourceContext { private SourceConfiguration sourceConfiguration; + private boolean isPipeMarkedToBeConsumed = false; + private ProtocolState state = ProtocolState.REQUEST_READY; private SourceRequest request; @@ -70,6 +72,15 @@ public ProtocolState getState() { return state; } + public boolean isPipeMarkedToBeConsumed() { + return isPipeMarkedToBeConsumed; + } + + public void setPipeMarkedToBeConsumed(boolean isPipeDiscarded) { + this.isPipeMarkedToBeConsumed = isPipeDiscarded; + } + + public void setState(ProtocolState state) { this.state = state; } diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java index e658643189..f5d13fb026 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java @@ -223,9 +223,15 @@ public void inputReady(NHttpServerConnection conn, conn.getContext().setAttribute(PassThroughConstants.REQ_FROM_CLIENT_BODY_READ_START_TIME, chunkReadStartTime); } ProtocolState protocolState = SourceContext.getState(conn); - if (protocolState != ProtocolState.REQUEST_HEAD && protocolState != ProtocolState.REQUEST_BODY) { + // This logic is added specifically here to avoid a race condition that can occur when + // inputReady is already called prior to suspendInput method is called in TargetHandler. + SourceContext sourceContext = (SourceContext) + conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); + if (sourceContext != null && sourceContext.isPipeMarkedToBeConsumed()) { + return; + } handleInvalidState(conn, "Request message body data received"); return; } diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index 3580131fd1..d7fa89c395 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -388,6 +388,12 @@ public void responseReceived(NHttpClientConnection conn) { NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty( PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); if (sourceConn != null) { + //Suspend input to avoid invoking input ready method. + sourceConn.suspendInput(); + SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); + if (sourceContext != null) { + sourceContext.setPipeMarkedToBeConsumed(true); + } SourceContext.updateState(sourceConn, ProtocolState.REQUEST_DONE); SourceContext.get(sourceConn).setShutDown(true); } From 8119e777b8f7fc1d8111c0987eff3b5558f3277d Mon Sep 17 00:00:00 2001 From: malakaganga Date: Mon, 8 May 2023 13:12:21 +0530 Subject: [PATCH 2/8] Fix Passthrough Threads getting stucked due to request message discard Remove discard request message in ClientWorker since in httpcore-nio already clears the inPlain buffer. Also set suspendInput state on connection so that when inputReady state hits it won't go for handleInvalidState. Also we have decided to proceed with the response regardless of the http status code. But mark target and source connections to be closed Fixes: https://github.com/wso2/api-manager/issues/1792 --- .../transport/passthru/TargetHandler.java | 57 ++++++++++--------- .../transport/passthru/TargetResponse.java | 1 - 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index d7fa89c395..4b9bed6004 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -372,38 +372,41 @@ public void responseReceived(NHttpClientConnection conn) { log.warn("Response received before the request is sent to the backend completely"); // State is not REQUEST_DONE. i.e the request is not completely written. But the response is started // receiving, therefore informing a write error has occurred. So the thread which is - // waiting on writing the request out, will get notified. + // waiting on writing the request out, will get notified. And we will proceed with the response + // regardless of the http status code. But mark target and source connections to be closed. informWriterError(conn); StatusLine errorStatus = response.getStatusLine(); - /* We might receive a 404 or a similar type, even before we write the request body. */ if (errorStatus != null) { - if (errorStatus.getStatusCode() >= HttpStatus.SC_BAD_REQUEST) { - TargetContext.updateState(conn, ProtocolState.REQUEST_DONE); - conn.resetOutput(); + TargetContext.updateState(conn, ProtocolState.REQUEST_DONE); + conn.resetOutput(); + if (log.isDebugEnabled()) { + log.debug(conn + ": Received response with status code : " + response.getStatusLine() + .getStatusCode() + " in invalid state : " + connState.name()); + } + if (errorStatus.getStatusCode() < HttpStatus.SC_BAD_REQUEST) { + log.warn(conn + ": Received a response but request is not completely written to the backend" + + "with status code : " + response.getStatusLine() + .getStatusCode() + " in state : " + connState.name()); + } + if (requestMsgContext != null) { + NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty( + PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); + if (sourceConn != null) { + //Suspend input to avoid invoking input ready method. + sourceConn.suspendInput(); + SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); + if (sourceContext != null) { + sourceContext.setPipeMarkedToBeConsumed(true); + } + SourceContext.updateState(sourceConn, ProtocolState.REQUEST_DONE); + SourceContext.get(sourceConn).setShutDown(true); + } + } else { if (log.isDebugEnabled()) { - log.debug(conn + ": Received response with status code : " + response.getStatusLine() - .getStatusCode() + " in invalid state : " + connState.name()); + log.debug(conn + ": has not started any request"); } - if (requestMsgContext != null) { - NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty( - PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); - if (sourceConn != null) { - //Suspend input to avoid invoking input ready method. - sourceConn.suspendInput(); - SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); - if (sourceContext != null) { - sourceContext.setPipeMarkedToBeConsumed(true); - } - SourceContext.updateState(sourceConn, ProtocolState.REQUEST_DONE); - SourceContext.get(sourceConn).setShutDown(true); - } - } else { - if (log.isDebugEnabled()) { - log.debug(conn + ": has not started any request"); - } - if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT) { - return; // ignoring the stale connection close - } + if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT) { + return; // ignoring the stale connection close } } } else { diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java index 5b039cbe8b..73f53d514b 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java @@ -263,7 +263,6 @@ private int getKeepAliveTimeout(String keepAlive) { } public boolean isForceShutdownConnectionOnComplete() { - return forceShutdownConnectionOnComplete; } } From d2a25f8bdc125769e85c02e1d9fecab18e60197d Mon Sep 17 00:00:00 2001 From: malakaganga Date: Mon, 8 May 2023 18:18:57 +0530 Subject: [PATCH 3/8] Set New Buffer for Error Responses in Pipe Introduce way to respond on the source handler side without consuming the request buffer by introducing a new method in RelayUtils class to allocate a new buffer for the response Also, set suspendInput() state on connection so that when inputReady state hits it won't go for handleInvalidState. Also mark the source connection to be closed. Introduce consume_and_discard property to enable/disable target handler side request message discard logic. Also move setting REQUEST_DONE when response recived in a partial request scenario to source handler to avoid concurrency issues. Fixes: https://github.com/wso2/api-manager/issues/1792 --- .../transport/passthru/ClientWorker.java | 9 +- .../transport/passthru/SourceContext.java | 10 +- .../transport/passthru/SourceHandler.java | 121 +++++++++--------- .../transport/passthru/TargetHandler.java | 4 +- .../config/PassThroughConfigPNames.java | 7 + .../config/PassThroughConfiguration.java | 9 ++ .../transport/passthru/util/RelayUtils.java | 21 +++ 7 files changed, 114 insertions(+), 67 deletions(-) diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java index 1f02fe907a..6885d085f1 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java @@ -40,6 +40,7 @@ import org.apache.synapse.transport.customlogsetter.CustomLogSetter; import org.apache.synapse.transport.http.conn.SynapseDebugInfoHolder; import org.apache.synapse.transport.nhttp.NhttpConstants; +import org.apache.synapse.transport.passthru.config.PassThroughConfiguration; import org.apache.synapse.transport.passthru.config.TargetConfiguration; import org.apache.synapse.transport.passthru.util.RelayUtils; @@ -63,6 +64,8 @@ public class ClientWorker implements Runnable { /** the axis2 message context of the request */ private MessageContext requestMessageContext; + private PassThroughConfiguration conf = PassThroughConfiguration.getInstance(); + public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response) { @@ -223,7 +226,11 @@ public void run() { getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis()); } try { - + // If an error has happened in the request processing, consumes the data in pipe completely and discard it + // If the consumeAndDiscard property is set to true + if (response.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) { + RelayUtils.discardRequestMessage(requestMessageContext); + } if (expectEntityBody) { String cType = response.getHeader(HTTP.CONTENT_TYPE); if(cType == null){ diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java index 8c20df7786..0c94f79f2b 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java @@ -40,7 +40,7 @@ public class SourceContext { private SourceConfiguration sourceConfiguration; - private boolean isPipeMarkedToBeConsumed = false; + private boolean isSourceRequestMarkedToBeDiscarded = false; private ProtocolState state = ProtocolState.REQUEST_READY; @@ -72,12 +72,12 @@ public ProtocolState getState() { return state; } - public boolean isPipeMarkedToBeConsumed() { - return isPipeMarkedToBeConsumed; + public boolean isSourceRequestMarkedToBeDiscarded() { + return isSourceRequestMarkedToBeDiscarded; } - public void setPipeMarkedToBeConsumed(boolean isPipeDiscarded) { - this.isPipeMarkedToBeConsumed = isPipeDiscarded; + public void setIsSourceRequestMarkedToBeDiscarded(boolean isPipeDiscarded) { + this.isSourceRequestMarkedToBeDiscarded = isPipeDiscarded; } diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java index f5d13fb026..376ed91c77 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java @@ -229,7 +229,7 @@ public void inputReady(NHttpServerConnection conn, // inputReady is already called prior to suspendInput method is called in TargetHandler. SourceContext sourceContext = (SourceContext) conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); - if (sourceContext != null && sourceContext.isPipeMarkedToBeConsumed()) { + if (sourceContext != null && sourceContext.isSourceRequestMarkedToBeDiscarded()) { return; } handleInvalidState(conn, "Request message body data received"); @@ -347,18 +347,23 @@ private void dropSourceConnection(NHttpServerConnection conn) { public void responseReady(NHttpServerConnection conn) { try { ProtocolState protocolState = SourceContext.getState(conn); - if (protocolState.compareTo(ProtocolState.REQUEST_DONE) < 0) { - return; - } + SourceContext sourceContext = (SourceContext) + conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); if (protocolState.compareTo(ProtocolState.CLOSING) >= 0) { informWriterError(conn); return; } - - if (protocolState != ProtocolState.REQUEST_DONE) { - handleInvalidState(conn, "Writing a response"); - return; + if (sourceContext != null && sourceContext.isSourceRequestMarkedToBeDiscarded() && protocolState != ProtocolState.REQUEST_READY) { + SourceContext.updateState(conn, ProtocolState.REQUEST_DONE); + } else { + if (protocolState.compareTo(ProtocolState.REQUEST_DONE) < 0) { + return; + } + if (protocolState != ProtocolState.REQUEST_DONE) { + handleInvalidState(conn, "Writing a response"); + return; + } } // because the duplex nature of http core we can reach hear without a actual response @@ -382,7 +387,7 @@ public void responseReady(NHttpServerConnection conn) { } } } - + response.start(conn); HttpContext context = conn.getContext(); if (transportLatencyLog.isDebugEnabled()) { @@ -437,21 +442,21 @@ public void outputReady(NHttpServerConnection conn, if(protocolState == ProtocolState.WSDL_RESPONSE_DONE){ //decrement request count for wsdl responses metrics.requestServed(); - // we need to shut down if the shutdown flag is set - HttpContext context = conn.getContext(); - ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute( + // we need to shut down if the shutdown flag is set + HttpContext context = conn.getContext(); + ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute( "synapse.response-source-buffer"); - int bytesWritten = outBuf.produceContent(encoder); + int bytesWritten = outBuf.produceContent(encoder); if (metrics != null && bytesWritten > 0) { metrics.incrementBytesSent(bytesWritten); } conn.requestInput(); if(outBuf instanceof SimpleOutputBuffer && !((SimpleOutputBuffer)outBuf).hasData()){ - sourceConfiguration.getSourceConnections().releaseConnection(conn); + sourceConfiguration.getSourceConnections().releaseConnection(conn); } endTransaction(conn); - return; + return; } @@ -562,7 +567,7 @@ else if (e.getMessage() != null) { } public void timeout(NHttpServerConnection conn) { - boolean isTimeoutOccurred = false; + boolean isTimeoutOccurred = false; ProtocolState state = SourceContext.getState(conn); Map logDetails = getLoggingInfo(conn, state); @@ -604,7 +609,7 @@ public void timeout(NHttpServerConnection conn) { } } else if (state == ProtocolState.REQUEST_DONE) { informWriterError(conn); - isTimeoutOccurred = true; + isTimeoutOccurred = true; metrics.timeoutOccured(); log.warn( "STATE_DESCRIPTION = Socket Timeout occurred after accepting the request headers and the request " @@ -624,9 +629,9 @@ public void timeout(NHttpServerConnection conn) { SourceContext.updateState(conn, ProtocolState.CLOSED); sourceConfiguration.getSourceConnections().closeConnection(conn, true); - if (isTimeoutOccurred) { - rollbackTransaction(conn); - } + if (isTimeoutOccurred) { + rollbackTransaction(conn); + } } public void closed(NHttpServerConnection conn) { @@ -639,7 +644,7 @@ public void closed(NHttpServerConnection conn) { getConnectionLoggingInfo(conn)); } } else if (state == ProtocolState.REQUEST_BODY || state == ProtocolState.REQUEST_HEAD) { - isFault = true; + isFault = true; informReaderError(conn); log.warn("STATE_DESCRIPTION = Connection closed while server accepting request headers but prior to " + "finish reading the request body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails @@ -654,7 +659,7 @@ public void closed(NHttpServerConnection conn) { logHttpRequestErrorInCorrelationLog(conn, "Connection Closed in " + state.name()); } } else if (state == ProtocolState.RESPONSE_BODY || state == ProtocolState.RESPONSE_HEAD) { - isFault = true; + isFault = true; informWriterError(conn); log.warn("STATE_DESCRIPTION = Connection closed while server writing the response headers or body, " + "INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails.get("direction") + ", " @@ -667,7 +672,7 @@ public void closed(NHttpServerConnection conn) { logHttpRequestErrorInCorrelationLog(conn, "Connection Closed in " + state.name()); } } else if (state == ProtocolState.REQUEST_DONE) { - isFault = true; + isFault = true; informWriterError(conn); log.warn("STATE_DESCRIPTION = Connection closed after server accepting the request headers and the " + "request body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails.get("direction") + ", " @@ -687,7 +692,7 @@ public void closed(NHttpServerConnection conn) { sourceConfiguration.getSourceConnections().closeConnection(conn, isFault); if (isFault) { rollbackTransaction(conn); - } + } } public void endOfInput(NHttpServerConnection conn) throws IOException { @@ -695,7 +700,7 @@ public void endOfInput(NHttpServerConnection conn) throws IOException { } public void exception(NHttpServerConnection conn, Exception ex) { - boolean isFault = false; + boolean isFault = false; if (ex instanceof IOException) { logIOException(conn, (IOException) ex); if (PassThroughCorrelationConfigDataHolder.isEnable()) { @@ -760,10 +765,10 @@ public void exception(NHttpServerConnection conn, Exception ex) { sourceConfiguration.getSourceConnections().shutDownConnection(conn, true); isFault = true; } - - if (isFault) { - rollbackTransaction(conn); - } + + if (isFault) { + rollbackTransaction(conn); + } } private Map getLoggingInfo(NHttpServerConnection conn, ProtocolState state) { @@ -927,42 +932,42 @@ public SourceRequest getSourceRequest(NHttpServerConnection conn) throws IOExcep metrics.incrementMessagesReceived(); return request; } - - private void rollbackTransaction(NHttpServerConnection conn) { - try { - Long serverWorkerThreadId = (Long) conn.getContext().getAttribute( - PassThroughConstants.SERVER_WORKER_THREAD_ID); - if (serverWorkerThreadId != null) { - TranscationManger.rollbackTransaction(false, - serverWorkerThreadId); - } - } catch (Exception ex) { - log.warn("Transaction rollback error after Connection closed " - + ex.getMessage() + conn); - } - } - - private void endTransaction(NHttpServerConnection conn) { - try { - Long serverWorkerThreadId = (Long) conn.getContext().getAttribute( - PassThroughConstants.SERVER_WORKER_THREAD_ID); - if (serverWorkerThreadId != null) { - TranscationManger.endTransaction(false, serverWorkerThreadId); - } - } catch (Exception ex) { - log.warn("Transaction rollback error after Connection closed " - + ex.getMessage() + conn); - } - } - private String getConnectionLoggingInfo(NHttpServerConnection conn) { + private void rollbackTransaction(NHttpServerConnection conn) { + try { + Long serverWorkerThreadId = (Long) conn.getContext().getAttribute( + PassThroughConstants.SERVER_WORKER_THREAD_ID); + if (serverWorkerThreadId != null) { + TranscationManger.rollbackTransaction(false, + serverWorkerThreadId); + } + } catch (Exception ex) { + log.warn("Transaction rollback error after Connection closed " + + ex.getMessage() + conn); + } + } + + private void endTransaction(NHttpServerConnection conn) { + try { + Long serverWorkerThreadId = (Long) conn.getContext().getAttribute( + PassThroughConstants.SERVER_WORKER_THREAD_ID); + if (serverWorkerThreadId != null) { + TranscationManger.endTransaction(false, serverWorkerThreadId); + } + } catch (Exception ex) { + log.warn("Transaction rollback error after Connection closed " + + ex.getMessage() + conn); + } + } + + private String getConnectionLoggingInfo(NHttpServerConnection conn) { if (conn instanceof LoggingNHttpServerConnection) { IOSession session = ((LoggingNHttpServerConnection) conn).getIOSession(); if (session != null) { return " Remote Address : " + session.getRemoteAddress(); } } - return ""; + return ""; } private String getClientConnectionInfo(NHttpServerConnection conn) { diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index 4b9bed6004..050cea0735 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -49,7 +49,6 @@ import org.apache.synapse.transport.passthru.config.TargetConfiguration; import org.apache.synapse.transport.passthru.connections.HostConnections; import org.apache.synapse.transport.passthru.jmx.PassThroughTransportMetricsCollector; -import org.apache.synapse.transport.passthru.util.RelayUtils; import java.io.IOException; import java.net.SocketAddress; @@ -396,9 +395,8 @@ public void responseReceived(NHttpClientConnection conn) { sourceConn.suspendInput(); SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); if (sourceContext != null) { - sourceContext.setPipeMarkedToBeConsumed(true); + sourceContext.setIsSourceRequestMarkedToBeDiscarded(true); } - SourceContext.updateState(sourceConn, ProtocolState.REQUEST_DONE); SourceContext.get(sourceConn).setShutDown(true); } } else { diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java index a8e816b025..047727ca88 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java @@ -85,6 +85,13 @@ public interface PassThroughConfigPNames { */ public String DISABLE_KEEPALIVE = "http.connection.disable.keepalive"; + + /** + * Define property to mark If an error has happened in the request processing, + * should consumes the data in pipe completely and discard. + */ + public String CONSUME_AND_DISCARD = "consume_and_discard"; + /** * Defines the time interval for idle connection removal. */ diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java index d7ec7ce05b..3b5e0141e0 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java @@ -51,6 +51,8 @@ public class PassThroughConfiguration { private static final int DEFAULT_CONNECTION_GRACE_TIME = 10000; private Boolean isKeepAliveDisabled = null; + private Boolean isConsumeAndDiscard = true; + //additional rest dispatch handlers private static final String REST_DISPATCHER_SERVICE="rest.dispatcher.service"; // URI configurations that determine if it requires custom rest dispatcher @@ -116,6 +118,13 @@ public boolean isKeepAliveDisabled() { return isKeepAliveDisabled.booleanValue(); } + public boolean isConsumeAndDiscard() { + isConsumeAndDiscard = + ConfigurationBuilderUtil.getBooleanProperty(PassThroughConfigPNames.CONSUME_AND_DISCARD, + true, props); + return isConsumeAndDiscard; + } + public int getMaxActiveConnections() { return getIntProperty(PassThroughConfigPNames.C_MAX_ACTIVE, DEFAULT_MAX_ACTIVE_CON); } diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java index b5b960edaf..f106bc4b8d 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java @@ -46,7 +46,10 @@ import org.apache.synapse.transport.nhttp.NhttpConstants; import org.apache.synapse.transport.passthru.PassThroughConstants; import org.apache.synapse.transport.passthru.Pipe; +import org.apache.synapse.transport.passthru.ProtocolState; import org.apache.synapse.transport.passthru.ServerWorker; +import org.apache.synapse.transport.passthru.SourceContext; +import org.apache.synapse.transport.passthru.TargetContext; import org.apache.synapse.transport.passthru.TargetRequest; import org.apache.synapse.transport.passthru.config.PassThroughConfiguration; @@ -532,6 +535,24 @@ public static void discardRequestMessage(MessageContext msgContext) throws AxisF discardMessage(requestContext); } + public static void discardSourceRequest(MessageContext msgContext) throws AxisFault { + NHttpServerConnection sourceConn = (NHttpServerConnection) msgContext.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); + if (sourceConn != null) { + sourceConn.suspendInput(); + SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); + if (sourceContext != null) { + sourceContext.setIsSourceRequestMarkedToBeDiscarded(true); + } + SourceContext.get(sourceConn).setShutDown(true); + } + Pipe pipe = (Pipe) msgContext.getProperty(PassThroughConstants.PASS_THROUGH_PIPE); + if (pipe != null) { + pipe.getBuffer().clear(); + pipe.resetOutputStream(); + msgContext.setProperty(PassThroughConstants.MESSAGE_BUILDER_INVOKED, Boolean.TRUE); + } + } + /** * Generate a log message containing the request information for the given message context. * From ee76f602696f753e74aac4bc2d5cde8f0df2a166 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Wed, 10 May 2023 10:27:19 +0530 Subject: [PATCH 4/8] fix_config --- .../transport/passthru/config/PassThroughConfiguration.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java index 3b5e0141e0..24510ddf41 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java @@ -40,6 +40,7 @@ public class PassThroughConfiguration { * Default tuning parameter values */ private static final int DEFAULT_WORKER_POOL_SIZE_CORE = 40; + private static final boolean DEFAULT_CONSUME_AND_DISCARD = true; private static final int DEFAULT_WORKER_POOL_SIZE_MAX = 200; private static final int DEFAULT_WORKER_THREAD_KEEPALIVE_SEC = 60; private static final int DEFAULT_WORKER_POOL_QUEUE_LENGTH = -1; @@ -119,9 +120,7 @@ public boolean isKeepAliveDisabled() { } public boolean isConsumeAndDiscard() { - isConsumeAndDiscard = - ConfigurationBuilderUtil.getBooleanProperty(PassThroughConfigPNames.CONSUME_AND_DISCARD, - true, props); + isConsumeAndDiscard = getBooleanProperty(PassThroughConfigPNames.CONSUME_AND_DISCARD, DEFAULT_CONSUME_AND_DISCARD); return isConsumeAndDiscard; } From 31fc215e1c2d0b795ea624db125ba2760864ffe0 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Wed, 10 May 2023 18:33:13 +0530 Subject: [PATCH 5/8] Fix not closing TLS session prior to tcp connection close in TargetHandler side Unless some other fatal alert has been transmitted, each party is required to send a close_notify alert before closing the write side of the connection. When using shutdown method this is not hapenning. So changed the behavior to closing TLS session prior to underlying TCP connection Fixes: https://github.com/wso2/api-manager/issues/1765 --- .../org/apache/synapse/transport/passthru/TargetHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index 050cea0735..17613e8939 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -714,7 +714,7 @@ public void closed(NHttpClientConnection conn) { metrics.disconnected(); TargetContext.updateState(conn, ProtocolState.CLOSED); - targetConfiguration.getConnections().shutdownConnection(conn, isFault); + targetConfiguration.getConnections().closeConnection(conn, isFault); } @@ -818,7 +818,7 @@ public void timeout(NHttpClientConnection conn) { } TargetContext.updateState(conn, ProtocolState.CLOSED); - targetConfiguration.getConnections().shutdownConnection(conn, true); + targetConfiguration.getConnections().closeConnection(conn, true); } private boolean isResponseHaveBodyExpected( From 8d7da4e6d1175bcfff8a85b10770c1e55eff34b9 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Thu, 11 May 2023 13:04:53 +0530 Subject: [PATCH 6/8] Alter fix done to Remove the consume and discard to support backward compatibility Related: https://github.com/wso2-support/wso2-synapse/pull/2080 Fixes: https://github.com/wso2/api-manager/issues/1792 --- .../transport/passthru/TargetHandler.java | 29 ++++++++++++------- .../transport/passthru/util/RelayUtils.java | 8 +++++ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index 17613e8939..7429fa2b23 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -45,6 +45,7 @@ import org.apache.synapse.transport.http.conn.LoggingNHttpClientConnection; import org.apache.synapse.transport.http.conn.ProxyTunnelHandler; import org.apache.synapse.transport.nhttp.NhttpConstants; +import org.apache.synapse.transport.passthru.config.PassThroughConfiguration; import org.apache.synapse.transport.passthru.config.PassThroughCorrelationConfigDataHolder; import org.apache.synapse.transport.passthru.config.TargetConfiguration; import org.apache.synapse.transport.passthru.connections.HostConnections; @@ -89,6 +90,8 @@ public class TargetHandler implements NHttpClientEventHandler { public static final String VALID_MAX_MESSAGE_SIZE = "valid.max.message.size.in.bytes"; public static final String CONNECTION_POOL = "CONNECTION_POOL"; + private PassThroughConfiguration conf = PassThroughConfiguration.getInstance(); + public TargetHandler(DeliveryAgent deliveryAgent, ClientConnFactory connFactory, TargetConfiguration configuration) { @@ -383,19 +386,23 @@ public void responseReceived(NHttpClientConnection conn) { .getStatusCode() + " in invalid state : " + connState.name()); } if (errorStatus.getStatusCode() < HttpStatus.SC_BAD_REQUEST) { - log.warn(conn + ": Received a response but request is not completely written to the backend" - + "with status code : " + response.getStatusLine() - .getStatusCode() + " in state : " + connState.name()); + log.warn(conn + ": Received a response with status code : " + + response.getStatusLine().getStatusCode() + " in state : " + connState.name() + + "but request is not completely written to the backend"); } if (requestMsgContext != null) { NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty( PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); if (sourceConn != null) { - //Suspend input to avoid invoking input ready method. - sourceConn.suspendInput(); - SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); - if (sourceContext != null) { - sourceContext.setIsSourceRequestMarkedToBeDiscarded(true); + if (!conf.isConsumeAndDiscard()) { + //Suspend input to avoid invoking input ready method and set this property here + //to avoid invoking the input ready method, while response is mediating through the + // mediation since we have set REQUEST_DONE state in SourceHandler responseReady method + sourceConn.suspendInput(); + SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION); + if (sourceContext != null) { + sourceContext.setIsSourceRequestMarkedToBeDiscarded(true); + } } SourceContext.get(sourceConn).setShutDown(true); } @@ -721,9 +728,9 @@ public void closed(NHttpClientConnection conn) { private void logIOException(NHttpClientConnection conn, IOException e) { String message = getErrorMessage("I/O error : " + e.getMessage(), conn); - if (e instanceof ConnectionClosedException || (e.getMessage() != null && - e.getMessage().toLowerCase().contains("connection reset by peer") || - e.getMessage().toLowerCase().contains("forcibly closed"))) { + if (e.getMessage() != null && (e instanceof ConnectionClosedException + || e.getMessage().toLowerCase().contains("connection reset by peer") + || e.getMessage().toLowerCase().contains("forcibly closed"))) { if (log.isDebugEnabled()) { log.debug(conn + ": I/O error (Probably the keep-alive connection " + "was closed):" + e.getMessage() diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java index f106bc4b8d..e7a3971dae 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java @@ -78,6 +78,8 @@ public class RelayUtils { private static boolean forceJSONValidation = false; + private static PassThroughConfiguration conf = PassThroughConfiguration.getInstance(); + static { if (forcePTBuild == null) { forcePTBuild = PassThroughConfiguration.getInstance().getBooleanProperty( @@ -536,6 +538,12 @@ public static void discardRequestMessage(MessageContext msgContext) throws AxisF } public static void discardSourceRequest(MessageContext msgContext) throws AxisFault { + //If consume_and_discard property is set to true, then we call the consumeAndDiscardMessage method + //to keep the backward compatibility. + if (conf.isConsumeAndDiscard()) { + consumeAndDiscardMessage(msgContext); + return; + } NHttpServerConnection sourceConn = (NHttpServerConnection) msgContext.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); if (sourceConn != null) { sourceConn.suspendInput(); From 1d4ea09e3924951fbeac6a3dcadf6b25dc57b35d Mon Sep 17 00:00:00 2001 From: malakaganga Date: Fri, 12 May 2023 11:08:50 +0530 Subject: [PATCH 7/8] Log improvements --- .../apache/synapse/transport/passthru/TargetHandler.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index 7429fa2b23..8f09e82bf2 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -371,7 +371,8 @@ public void responseReceived(NHttpClientConnection conn) { if (connState != ProtocolState.REQUEST_DONE) { isError = true; MessageContext requestMsgContext = TargetContext.get(conn).getRequestMsgCtx(); - log.warn("Response received before the request is sent to the backend completely"); + log.warn("Response received before the request is sent to the backend completely , CORRELATION_ID = " + + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID)); // State is not REQUEST_DONE. i.e the request is not completely written. But the response is started // receiving, therefore informing a write error has occurred. So the thread which is // waiting on writing the request out, will get notified. And we will proceed with the response @@ -388,7 +389,8 @@ public void responseReceived(NHttpClientConnection conn) { if (errorStatus.getStatusCode() < HttpStatus.SC_BAD_REQUEST) { log.warn(conn + ": Received a response with status code : " + response.getStatusLine().getStatusCode() + " in state : " + connState.name() - + "but request is not completely written to the backend"); + + " but request is not completely written to the backend, CORRELATION_ID = " + + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID)); } if (requestMsgContext != null) { NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty( From 1626fbdb058492a07234762a70ce214be838076c Mon Sep 17 00:00:00 2001 From: Nuwan Wimalasekara Date: Sat, 8 Apr 2023 17:35:18 +0530 Subject: [PATCH 8/8] Add the fix in synapse PR 1484 again which is reverted by PR 1719 --- .../apache/synapse/core/axis2/SynapseCallbackReceiver.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java b/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java index 6778b3ac0e..5d4d72aa98 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java +++ b/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java @@ -177,7 +177,6 @@ public void receive(MessageContext messageCtx) throws AxisFault { } if (callback != null) { - messageCtx.removeProperty(PassThroughConstants.INTERNAL_EXCEPTION_ORIGIN); org.apache.synapse.MessageContext SynapseOutMsgCtx = callback.getSynapseOutMsgCtx(); ConcurrencyThrottlingUtils.decrementConcurrencyThrottleAccessController(SynapseOutMsgCtx); @@ -187,7 +186,7 @@ public void receive(MessageContext messageCtx) throws AxisFault { } callback.setMarkedForRemoval(); } - + messageCtx.removeProperty(PassThroughConstants.INTERNAL_EXCEPTION_ORIGIN); if (RuntimeStatisticCollector.isStatisticsEnabled()) { CallbackStatisticCollector.updateParentsForCallback(SynapseOutMsgCtx, messageID); handleMessage(messageID, messageCtx, SynapseOutMsgCtx, (AsyncCallback) callback); @@ -206,7 +205,6 @@ public void receive(MessageContext messageCtx) throws AxisFault { + " and correlation_id : " + messageCtx.getProperty(CorrelationConstants.CORRELATION_ID) + " But a callback is not registered (anymore) to process " + "this response"); } - messageCtx.removeProperty(PassThroughConstants.INTERNAL_EXCEPTION_ORIGIN); } } else if (!messageCtx.isPropertyTrue(NhttpConstants.SC_ACCEPTED)){ @@ -519,7 +517,7 @@ private void handleMessage(String messageID ,MessageContext response, failOver =true; } } - + // set the properties of the original MC to the new MC for (Object key : synapseOutMsgCtx.getPropertyKeySet()) {