Skip to content

Commit

Permalink
Merge pull request wso2#2090 from arunans23/bny-bulk
Browse files Browse the repository at this point in the history
Port BNY fixes to APIM 3.2.0
  • Loading branch information
arunans23 authored May 12, 2023
2 parents 14c6a7e + 1626fbd commit 0fa6580
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void receive(MessageContext messageCtx) throws AxisFault {
}

if (callback != null) {
messageCtx.removeProperty(PassThroughConstants.INTERNAL_EXCEPTION_ORIGIN);
org.apache.synapse.MessageContext SynapseOutMsgCtx = callback.getSynapseOutMsgCtx();
ConcurrencyThrottlingUtils.decrementConcurrencyThrottleAccessController(SynapseOutMsgCtx);

Expand All @@ -187,7 +186,7 @@ public void receive(MessageContext messageCtx) throws AxisFault {
}
callback.setMarkedForRemoval();
}

messageCtx.removeProperty(PassThroughConstants.INTERNAL_EXCEPTION_ORIGIN);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
CallbackStatisticCollector.updateParentsForCallback(SynapseOutMsgCtx, messageID);
handleMessage(messageID, messageCtx, SynapseOutMsgCtx, (AsyncCallback) callback);
Expand All @@ -206,7 +205,6 @@ public void receive(MessageContext messageCtx) throws AxisFault {
+ " and correlation_id : " + messageCtx.getProperty(CorrelationConstants.CORRELATION_ID)
+ " But a callback is not registered (anymore) to process " + "this response");
}
messageCtx.removeProperty(PassThroughConstants.INTERNAL_EXCEPTION_ORIGIN);
}

} else if (!messageCtx.isPropertyTrue(NhttpConstants.SC_ACCEPTED)){
Expand Down Expand Up @@ -519,7 +517,7 @@ private void handleMessage(String messageID ,MessageContext response,
failOver =true;
}
}

// set the properties of the original MC to the new MC

for (Object key : synapseOutMsgCtx.getPropertyKeySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.apache.synapse.transport.http.conn.SynapseDebugInfoHolder;
import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.apache.synapse.transport.passthru.config.PassThroughConfiguration;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.util.RelayUtils;

Expand All @@ -63,6 +64,8 @@ public class ClientWorker implements Runnable {
/** the axis2 message context of the request */
private MessageContext requestMessageContext;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

public ClientWorker(TargetConfiguration targetConfiguration,
MessageContext outMsgCtx,
TargetResponse response) {
Expand Down Expand Up @@ -224,10 +227,10 @@ public void run() {
}
try {
// If an error has happened in the request processing, consumes the data in pipe completely and discard it
if (response.isForceShutdownConnectionOnComplete()) {
// If the consumeAndDiscard property is set to true
if (response.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) {
RelayUtils.discardRequestMessage(requestMessageContext);
}

if (expectEntityBody) {
String cType = response.getHeader(HTTP.CONTENT_TYPE);
if(cType == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class SourceContext {

private SourceConfiguration sourceConfiguration;

private boolean isSourceRequestMarkedToBeDiscarded = false;

private ProtocolState state = ProtocolState.REQUEST_READY;

private SourceRequest request;
Expand Down Expand Up @@ -70,6 +72,15 @@ public ProtocolState getState() {
return state;
}

public boolean isSourceRequestMarkedToBeDiscarded() {
return isSourceRequestMarkedToBeDiscarded;
}

public void setIsSourceRequestMarkedToBeDiscarded(boolean isPipeDiscarded) {
this.isSourceRequestMarkedToBeDiscarded = isPipeDiscarded;
}


public void setState(ProtocolState state) {
this.state = state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,15 @@ public void inputReady(NHttpServerConnection conn,
conn.getContext().setAttribute(PassThroughConstants.REQ_FROM_CLIENT_BODY_READ_START_TIME, chunkReadStartTime);
}
ProtocolState protocolState = SourceContext.getState(conn);

if (protocolState != ProtocolState.REQUEST_HEAD
&& protocolState != ProtocolState.REQUEST_BODY) {
// This logic is added specifically here to avoid a race condition that can occur when
// inputReady is already called prior to suspendInput method is called in TargetHandler.
SourceContext sourceContext = (SourceContext)
conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION);
if (sourceContext != null && sourceContext.isSourceRequestMarkedToBeDiscarded()) {
return;
}
handleInvalidState(conn, "Request message body data received");
return;
}
Expand Down Expand Up @@ -341,18 +347,23 @@ private void dropSourceConnection(NHttpServerConnection conn) {
public void responseReady(NHttpServerConnection conn) {
try {
ProtocolState protocolState = SourceContext.getState(conn);
if (protocolState.compareTo(ProtocolState.REQUEST_DONE) < 0) {
return;
}
SourceContext sourceContext = (SourceContext)
conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION);

if (protocolState.compareTo(ProtocolState.CLOSING) >= 0) {
informWriterError(conn);
return;
}

if (protocolState != ProtocolState.REQUEST_DONE) {
handleInvalidState(conn, "Writing a response");
return;
if (sourceContext != null && sourceContext.isSourceRequestMarkedToBeDiscarded() && protocolState != ProtocolState.REQUEST_READY) {
SourceContext.updateState(conn, ProtocolState.REQUEST_DONE);
} else {
if (protocolState.compareTo(ProtocolState.REQUEST_DONE) < 0) {
return;
}
if (protocolState != ProtocolState.REQUEST_DONE) {
handleInvalidState(conn, "Writing a response");
return;
}
}

// because the duplex nature of http core we can reach hear without a actual response
Expand All @@ -376,7 +387,7 @@ public void responseReady(NHttpServerConnection conn) {
}
}
}

response.start(conn);
HttpContext context = conn.getContext();
if (transportLatencyLog.isDebugEnabled()) {
Expand Down Expand Up @@ -431,21 +442,21 @@ public void outputReady(NHttpServerConnection conn,
if(protocolState == ProtocolState.WSDL_RESPONSE_DONE){
//decrement request count for wsdl responses
metrics.requestServed();
// we need to shut down if the shutdown flag is set
HttpContext context = conn.getContext();
ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(
// we need to shut down if the shutdown flag is set
HttpContext context = conn.getContext();
ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(
"synapse.response-source-buffer");
int bytesWritten = outBuf.produceContent(encoder);
int bytesWritten = outBuf.produceContent(encoder);
if (metrics != null && bytesWritten > 0) {
metrics.incrementBytesSent(bytesWritten);
}

conn.requestInput();
if(outBuf instanceof SimpleOutputBuffer && !((SimpleOutputBuffer)outBuf).hasData()){
sourceConfiguration.getSourceConnections().releaseConnection(conn);
sourceConfiguration.getSourceConnections().releaseConnection(conn);
}
endTransaction(conn);
return;
return;
}


Expand Down Expand Up @@ -556,7 +567,7 @@ else if (e.getMessage() != null) {
}

public void timeout(NHttpServerConnection conn) {
boolean isTimeoutOccurred = false;
boolean isTimeoutOccurred = false;
ProtocolState state = SourceContext.getState(conn);
Map<String, String> logDetails = getLoggingInfo(conn, state);

Expand Down Expand Up @@ -598,7 +609,7 @@ public void timeout(NHttpServerConnection conn) {
}
} else if (state == ProtocolState.REQUEST_DONE) {
informWriterError(conn);
isTimeoutOccurred = true;
isTimeoutOccurred = true;
metrics.timeoutOccured();
log.warn(
"STATE_DESCRIPTION = Socket Timeout occurred after accepting the request headers and the request "
Expand All @@ -618,9 +629,9 @@ public void timeout(NHttpServerConnection conn) {
SourceContext.updateState(conn, ProtocolState.CLOSED);

sourceConfiguration.getSourceConnections().closeConnection(conn, true);
if (isTimeoutOccurred) {
rollbackTransaction(conn);
}
if (isTimeoutOccurred) {
rollbackTransaction(conn);
}
}

public void closed(NHttpServerConnection conn) {
Expand All @@ -633,7 +644,7 @@ public void closed(NHttpServerConnection conn) {
getConnectionLoggingInfo(conn));
}
} else if (state == ProtocolState.REQUEST_BODY || state == ProtocolState.REQUEST_HEAD) {
isFault = true;
isFault = true;
informReaderError(conn);
log.warn("STATE_DESCRIPTION = Connection closed while server accepting request headers but prior to "
+ "finish reading the request body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails
Expand All @@ -648,7 +659,7 @@ public void closed(NHttpServerConnection conn) {
logHttpRequestErrorInCorrelationLog(conn, "Connection Closed in " + state.name());
}
} else if (state == ProtocolState.RESPONSE_BODY || state == ProtocolState.RESPONSE_HEAD) {
isFault = true;
isFault = true;
informWriterError(conn);
log.warn("STATE_DESCRIPTION = Connection closed while server writing the response headers or body, "
+ "INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails.get("direction") + ", "
Expand All @@ -661,7 +672,7 @@ public void closed(NHttpServerConnection conn) {
logHttpRequestErrorInCorrelationLog(conn, "Connection Closed in " + state.name());
}
} else if (state == ProtocolState.REQUEST_DONE) {
isFault = true;
isFault = true;
informWriterError(conn);
log.warn("STATE_DESCRIPTION = Connection closed after server accepting the request headers and the "
+ "request body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails.get("direction") + ", "
Expand All @@ -681,15 +692,15 @@ public void closed(NHttpServerConnection conn) {
sourceConfiguration.getSourceConnections().closeConnection(conn, isFault);
if (isFault) {
rollbackTransaction(conn);
}
}
}

public void endOfInput(NHttpServerConnection conn) throws IOException {
conn.close();
}

public void exception(NHttpServerConnection conn, Exception ex) {
boolean isFault = false;
boolean isFault = false;
if (ex instanceof IOException) {
logIOException(conn, (IOException) ex);
if (PassThroughCorrelationConfigDataHolder.isEnable()) {
Expand Down Expand Up @@ -754,10 +765,10 @@ public void exception(NHttpServerConnection conn, Exception ex) {
sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
isFault = true;
}
if (isFault) {
rollbackTransaction(conn);
}

if (isFault) {
rollbackTransaction(conn);
}
}

private Map<String, String> getLoggingInfo(NHttpServerConnection conn, ProtocolState state) {
Expand Down Expand Up @@ -921,42 +932,42 @@ public SourceRequest getSourceRequest(NHttpServerConnection conn) throws IOExcep
metrics.incrementMessagesReceived();
return request;
}

private void rollbackTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.rollbackTransaction(false,
serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private void endTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.endTransaction(false, serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private String getConnectionLoggingInfo(NHttpServerConnection conn) {
private void rollbackTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.rollbackTransaction(false,
serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private void endTransaction(NHttpServerConnection conn) {
try {
Long serverWorkerThreadId = (Long) conn.getContext().getAttribute(
PassThroughConstants.SERVER_WORKER_THREAD_ID);
if (serverWorkerThreadId != null) {
TranscationManger.endTransaction(false, serverWorkerThreadId);
}
} catch (Exception ex) {
log.warn("Transaction rollback error after Connection closed "
+ ex.getMessage() + conn);
}
}

private String getConnectionLoggingInfo(NHttpServerConnection conn) {
if (conn instanceof LoggingNHttpServerConnection) {
IOSession session = ((LoggingNHttpServerConnection) conn).getIOSession();
if (session != null) {
return " Remote Address : " + session.getRemoteAddress();
}
}
return "";
return "";
}

private String getClientConnectionInfo(NHttpServerConnection conn) {
Expand Down
Loading

0 comments on commit 0fa6580

Please sign in to comment.