Skip to content

Commit

Permalink
Refactor timeout and close routines; seems to plug the container leak
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Oct 22, 2024
1 parent 38d184b commit e15836e
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Stream;

import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCode;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.Extension;
import javax.websocket.Session;

Expand Down Expand Up @@ -65,6 +68,7 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We
// associated websocket session
private final WsSession wsSession;

// reference to the scope for manager access
private WeakReference<WebSocketScope> scope;

// unique identifier for the session
Expand Down Expand Up @@ -163,6 +167,8 @@ public WebSocketConnection(WebSocketScope scope, Session session) {
// add the timeouts to the user props
userProps.put(Constants.READ_IDLE_TIMEOUT_MS, readTimeout);
userProps.put(Constants.WRITE_IDLE_TIMEOUT_MS, sendTimeout);
// set the close timeout to 5 seconds
userProps.put(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY, TimeUnit.SECONDS.toMillis(5));
if (isDebug) {
log.debug("userProps: {}", userProps);
}
Expand All @@ -186,9 +192,8 @@ public void send(String data) throws UnsupportedEncodingException, IOException {
}
// process the incoming string
if (StringUtils.isNotBlank(data)) {
final WsSession session = wsSession;
// attempt send only if the session is not closed
if (session != null && !session.isClosed()) {
if (!wsSession.isClosed()) {
try {
if (useAsync) {
if (sendFuture != null && !sendFuture.isDone()) {
Expand All @@ -197,21 +202,21 @@ public void send(String data) throws UnsupportedEncodingException, IOException {
} catch (TimeoutException e) {
log.warn("Send timed out {}", wsSessionId);
// if the session is not open, cancel the future
if (!session.isOpen()) {
if (!wsSession.isOpen()) {
sendFuture.cancel(true);
return;
}
}
}
synchronized (wsSessionId) {
int lengthToWrite = data.getBytes().length;
sendFuture = session.getAsyncRemote().sendText(data);
sendFuture = wsSession.getAsyncRemote().sendText(data);
updateWriteBytes(lengthToWrite);
}
} else {
synchronized (wsSessionId) {
int lengthToWrite = data.getBytes().length;
session.getBasicRemote().sendText(data);
wsSession.getBasicRemote().sendText(data);
updateWriteBytes(lengthToWrite);
}
}
Expand All @@ -236,8 +241,7 @@ public void send(byte[] buf) throws IOException {
if (isDebug) {
log.debug("send binary: {}", Arrays.toString(buf));
}
WsSession session = wsSession;
if (session != null && session.isOpen()) {
if (!wsSession.isClosed()) {
try {
// send the bytes
if (useAsync) {
Expand All @@ -253,12 +257,12 @@ public void send(byte[] buf) throws IOException {
}
}
synchronized (wsSessionId) {
sendFuture = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf));
sendFuture = wsSession.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf));
updateWriteBytes(buf.length);
}
} else {
synchronized (wsSessionId) {
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf));
wsSession.getBasicRemote().sendBinary(ByteBuffer.wrap(buf));
updateWriteBytes(buf.length);
}
}
Expand All @@ -281,11 +285,10 @@ public void sendPing(byte[] buf) throws IllegalArgumentException, IOException {
if (isTrace) {
log.trace("send ping: {}", buf);
}
WsSession session = wsSession;
if (session != null && session.isOpen()) {
if (!wsSession.isClosed()) {
synchronized (wsSessionId) {
// send the bytes
session.getBasicRemote().sendPing(ByteBuffer.wrap(buf));
wsSession.getBasicRemote().sendPing(ByteBuffer.wrap(buf));
// update counter
updateWriteBytes(buf.length);
}
Expand All @@ -305,11 +308,10 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException {
if (isTrace) {
log.trace("send pong: {}", buf);
}
WsSession session = wsSession;
if (session != null && session.isOpen()) {
if (!wsSession.isClosed()) {
synchronized (wsSessionId) {
// send the bytes
session.getBasicRemote().sendPong(ByteBuffer.wrap(buf));
wsSession.getBasicRemote().sendPong(ByteBuffer.wrap(buf));
// update counter
updateWriteBytes(buf.length);
}
Expand All @@ -319,14 +321,34 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException {
}

/**
* close Connection
* Close the connection.
*/
public void close() {
close(CloseCodes.NORMAL_CLOSURE, "");
}

/**
* Close the connection with a reason.
*
* @param code CloseCode
* @param reasonPhrase short reason for closing
*/
public void close(CloseCode code, String reasonPhrase) {
if (connected.compareAndSet(true, false)) {
log.debug("close: {}", wsSessionId);
// trying to close the session nicely
// no blank reasons
if (reasonPhrase == null) {
reasonPhrase = "";
}
log.debug("close: {} code: {} reason: {}", wsSessionId, code, reasonPhrase);
try {
wsSession.close();
// close the session if open
if (wsSession.isOpen()) {
CloseReason reason = new CloseReason(code, reasonPhrase);
if (isDebug) {
log.debug("Closing session: {} with reason: {}", wsSessionId, reason);
}
wsSession.close(reason);
}
} catch (Exception e) {
log.debug("Exception closing session", e);
}
Expand All @@ -343,39 +365,9 @@ public void close() {
if (headers != null) {
headers = null;
}
if (scope.get() != null) {
// disconnect from scope
scope.get().removeConnection(this);
// clear weak refs
scope.clear();
}
}
}

/*
WsSession uses these userProperties for checkExpiration along with maxIdleTimeout
configuration for read idle timeout on WebSocket session
READ_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.READ_IDLE_TIMEOUT_MS";
configuration for write idle timeout on WebSocket session
WRITE_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.WRITE_IDLE_TIMEOUT_MS";
*/
public void timeoutAsync(long now) {
// XXX(paul) only logging here as we should more than likely rely upon the container checking expiration
log.trace("timeoutAsync: {} on session id: {} read: {} written: {}", now, wsSessionId, readBytes, writtenBytes);
/*
WsSession session = wsSession;
Map<String, Object> props = session.getUserProperties();
log.debug("Session properties: {}", props);
long maxIdleTimeout = session.getMaxIdleTimeout();
long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS);
long sendTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS);
log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, sendTimeout);
//long readDelta = (now - lastReadTime), writeDelta = (now - lastWriteTime);
//log.debug("timeoutAsync: {} on {} last read: {} last write: {}", now, wsSessionId, readDelta, writeDelta);
*/
}

/**
* Async send is enabled in non-Windows based systems; this provides a means to override it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.CloseReason.CloseCodes;

import org.red5.net.websocket.listener.IWebSocketDataListener;
import org.red5.net.websocket.model.WSMessage;
import org.red5.server.api.scope.IScope;
Expand Down Expand Up @@ -91,7 +93,7 @@ public void unregister() {
// clean up the connections by first closing them
conns.forEach(conn -> {
if (conns.remove(conn)) {
conn.close();
conn.close(CloseCodes.GOING_AWAY, "WebSocket scope removed");
}
});
// clean up the listeners by first stopping them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.websocket.CloseReason.CloseCodes;

import org.red5.net.websocket.listener.DefaultWebSocketDataListener;
import org.red5.net.websocket.listener.IWebSocketDataListener;
import org.red5.net.websocket.listener.IWebSocketScopeListener;
Expand Down Expand Up @@ -172,17 +174,12 @@ public boolean addWebSocketScope(WebSocketScope webSocketScope) {
wsConn.sendPing(PING_BYTES);
} catch (Exception e) {
log.debug("Exception pinging connection: {} connection will be closed", wsConn.getSessionId(), e);
// if the connection isn't connected, remove them
wsScope.removeConnection(wsConn);
// if the ping fails, consider them gone
wsConn.close();
wsConn.close(CloseCodes.CLOSED_ABNORMALLY, e.getMessage());
}
} else {
log.debug("Removing unconnected connection: {} during ping loop", wsConn.getSessionId());
// if the connection isn't connected, remove them
wsScope.removeConnection(wsConn);
// if connection is not connected, close it (ensure closed / removed)
wsConn.close();
wsConn.close(CloseCodes.UNEXPECTED_CONDITION, "Connection not connected");
}
} catch (Exception e) {
log.warn("Exception in WS pinger", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void onClose(Session session, CloseReason closeReason) {
// force remove on exception
scope.removeConnection(conn);
// fire close, to be sure
conn.close();
conn.close(closeReason.getCloseCode(), closeReason.getReasonPhrase());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ protected void registerSession(Object endpoint, WsSession wsSession) {
*/
@Override
protected void unregisterSession(Object endpoint, WsSession wsSession) {
if (wsSession.getUserPrincipal() != null && wsSession.getHttpSessionId() != null) {
if (wsSession.getHttpSessionId() != null) {
unregisterAuthenticatedSession(wsSession, wsSession.getHttpSessionId());
log.debug("unregisterSession - unregisterAuthenticatedSession: {}", wsSession.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.tomcat.util.net.SocketEvent;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.websocket.Constants;
import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsIOException;
import org.apache.tomcat.websocket.WsSession;
Expand Down Expand Up @@ -78,6 +79,10 @@ public class WsHttpUpgradeHandler implements InternalHttpUpgradeHandler {

private WsSession wsSession;

private long lastTimeoutCheck = System.currentTimeMillis();

private long lastReadBytes, lastWrittenBytes;

public WsHttpUpgradeHandler() {
applicationClassLoader = Thread.currentThread().getContextClassLoader();
}
Expand Down Expand Up @@ -284,23 +289,66 @@ public void setSslSupport(SSLSupport sslSupport) {
@Override
public void timeoutAsync(long now) {
log.trace("timeoutAsync: {} on session: {}", now, wsSession);
// session methods may not be called if the session is not open
if (wsSession != null) {
if (wsSession.isOpen()) {
try {
// if we have a timeout, inform the ws connection
WebSocketConnection conn = (WebSocketConnection) wsSession.getUserProperties().get(WSConstants.WS_CONNECTION);
if (conn != null) {
conn.timeoutAsync(now);
try {
final String wsSessionId = wsSession.getId();
// get scope from endpoint config
WebSocketScope scope = (WebSocketScope) endpointConfig.getUserProperties().get(WSConstants.WS_SCOPE);
// do lookup by session id, skips need for session user props
WebSocketConnection conn = scope.getConnectionBySessionId(wsSessionId);
// if we don't get it from the scope, try the session lookup
if (conn == null && wsSession.isOpen()) {
// session methods may not be called if its not open
conn = (WebSocketConnection) wsSession.getUserProperties().get(WSConstants.WS_CONNECTION);
}
// last check, if we don't have a connection, log a warning
if (conn == null) {
log.warn("Connection for id: {} was not found in the scope or session: {}", wsSession.getId(), scope.getPath());
return;
}
// negative now means always treat as expired
if (now > 0) {
long checkDelta = now - lastTimeoutCheck;
long readBytes = conn.getReadBytes(), writtenBytes = conn.getWrittenBytes();
log.info("timeoutAsync: {}ms on session id: {} read: {} written: {}", checkDelta, wsSessionId, readBytes, writtenBytes);
Map<String, Object> props = wsSession.getUserProperties();
log.debug("Session properties: {}", props);
long maxIdleTimeout = wsSession.getMaxIdleTimeout();
long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS);
long writeTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS);
log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, writeTimeout);
if (maxIdleTimeout > 0) {
if (checkDelta > maxIdleTimeout && (readBytes == lastReadBytes || writtenBytes == lastWrittenBytes)) {
log.info("Max idle timeout: {}ms on session id: {}", checkDelta, wsSessionId);
conn.close(CloseCodes.GOING_AWAY, "Max idle timeout");
}
} else {
if (readTimeout > 0) {
if (readBytes == lastReadBytes) {
if (checkDelta > readTimeout) {
log.info("Read timeout: {}ms on session id: {}", checkDelta, wsSessionId);
conn.close(CloseCodes.GOING_AWAY, "Read timeout");
}
}
}
if (writeTimeout > 0) {
if (writtenBytes == lastWrittenBytes) {
if (checkDelta > writeTimeout) {
log.info("Write timeout: {}ms on session id: {}", checkDelta, wsSessionId);
conn.close(CloseCodes.GOING_AWAY, "Write timeout");
}
}
}
}
} catch (Throwable t) {
log.warn(sm.getString("wsHttpUpgradeHandler.timeoutAsyncFailed"), t);
lastReadBytes = readBytes;
lastWrittenBytes = writtenBytes;
lastTimeoutCheck = now;
} else {
log.warn("timeoutAsync: negative time on session id: {}", wsSessionId);
conn.close(CloseCodes.GOING_AWAY, "Timeout expired");
}
} else {
log.debug("timeoutAsync: {} session is not open for session id: {}", now, wsSession.getId());
// we need the processor released from the async waitingProcessors list
// located in abstract protocol
//socketWrapper.close();
} catch (Throwable t) {
log.warn(sm.getString("wsHttpUpgradeHandler.timeoutAsyncFailed"), t);
}
}
}
Expand Down

0 comments on commit e15836e

Please sign in to comment.