Skip to content

Commit

Permalink
When terminating detach the connection from request/response objects
Browse files Browse the repository at this point in the history
Related to #3416, #3367
  • Loading branch information
violetagg committed Oct 7, 2024
1 parent e677444 commit c1741b8
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCounted;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -123,9 +130,9 @@ else if (recorder instanceof ContextAwareChannelMetricsRecorder) {
.as(ChannelOperations.class);
}

final Connection connection;
Connection connection;
final FluxReceive inbound;
final ConnectionObserver listener;
ConnectionObserver listener;
final Sinks.Empty<Void> onTerminate;

volatile Subscription outboundSubscription;
Expand Down Expand Up @@ -502,6 +509,8 @@ protected final void terminate() {
// and it is guarded by rebind(connection), so tryEmitEmpty() should happen just once
onTerminate.tryEmitEmpty();
listener.onStateChange(this, ConnectionObserver.State.DISCONNECTING);
connection = new DisposedConnection(channel());
listener = ConnectionObserver.emptyListener();
}
}

Expand Down Expand Up @@ -681,4 +690,106 @@ static OnSetup empty() {
Subscription.class,
"outboundSubscription");

static final class DisposedChannel extends AbstractChannel {

final DefaultChannelConfig config;
final SocketAddress localAddress;
final ChannelMetadata metadata;
final SocketAddress remoteAddress;

DisposedChannel(Channel copy) {
super(null);
this.metadata = copy.metadata();
this.config = new DefaultChannelConfig(this);
this.localAddress = copy.localAddress();
this.remoteAddress = copy.remoteAddress();
}

@Override
public ChannelConfig config() {
return config;
}

@Override
protected void doBeginRead() {
throw new UnsupportedOperationException();
}

@Override
protected void doBind(SocketAddress socketAddress) {
throw new UnsupportedOperationException();
}

@Override
protected void doClose() {
throw new UnsupportedOperationException();
}

@Override
protected void doDisconnect() {
throw new UnsupportedOperationException();
}

@Override
protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) {
throw new UnsupportedOperationException();
}

@Override
public boolean isActive() {
return false;
}

@Override
protected boolean isCompatible(EventLoop eventLoop) {
return false;
}

@Override
public boolean isOpen() {
return false;
}

@Override
protected SocketAddress localAddress0() {
return localAddress;
}

@Override
public ChannelMetadata metadata() {
return metadata;
}

@Override
protected AbstractUnsafe newUnsafe() {
return new DisposedChannelUnsafe();
}

@Override
protected SocketAddress remoteAddress0() {
return remoteAddress;
}

final class DisposedChannelUnsafe extends AbstractUnsafe {

@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
}
}

static final class DisposedConnection implements Connection {

final Channel channel;

DisposedConnection(Channel copy) {
this.channel = new DisposedChannel(copy);
}

@Override
public Channel channel() {
return channel;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,6 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable

static final int QUEUE_LOW_LIMIT = 32;

final Channel channel;
final ChannelOperations<?, ?> parent;
final EventLoop eventLoop;

Expand Down Expand Up @@ -78,9 +77,8 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable
//reset channel to manual read if re-used

this.parent = parent;
this.channel = parent.channel();
this.eventLoop = channel.eventLoop();
channel.config()
this.eventLoop = parent.channel().eventLoop();
parent.channel().config()
.setAutoRead(false);
CANCEL.lazySet(this, (state) -> {
if (eventLoop.inEventLoop()) {
Expand Down Expand Up @@ -155,7 +153,7 @@ final void startReceiver(CoreSubscriber<? super Object> s) {
if (!subscribedOnce) {
subscribedOnce = true;
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: subscribing inbound receiver"), this);
log.debug(format(parent.channel(), "{}: subscribing inbound receiver"), this);
}
if ((inboundDone && getPending() == 0) || isCancelled()) {
if (inboundError != null) {
Expand All @@ -182,7 +180,7 @@ final void startReceiver(CoreSubscriber<? super Object> s) {
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: Rejecting additional inbound receiver."), this);
log.debug(format(parent.channel(), "{}: Rejecting additional inbound receiver."), this);
}

String msg = "Rejecting additional inbound receiver. State=" + toString(false);
Expand Down Expand Up @@ -218,7 +216,7 @@ final void cleanQueue(@Nullable Queue<Object> q) {
Object o;
while ((o = q.poll()) != null) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(o));
log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(o));
}
ReferenceCountUtil.release(o);
}
Expand Down Expand Up @@ -283,11 +281,11 @@ final void drainReceiver() {
try {
if (logLeakDetection.isDebugEnabled()) {
if (v instanceof ByteBuf) {
((ByteBuf) v).touch(format(channel, "Receiver " + a.getClass().getName() +
((ByteBuf) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() +
" will handle the message from this point"));
}
else if (v instanceof ByteBufHolder) {
((ByteBufHolder) v).touch(format(channel, "Receiver " + a.getClass().getName() +
((ByteBufHolder) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() +
" will handle the message from this point"));
}
}
Expand Down Expand Up @@ -322,7 +320,7 @@ else if (v instanceof ByteBufHolder) {
receiverFastpath = true;
if (needRead) {
needRead = false;
channel.config()
parent.channel().config()
.setAutoRead(true);
}
//CHECKSTYLE:OFF
Expand All @@ -336,13 +334,13 @@ else if (v instanceof ByteBufHolder) {
if ((receiverDemand -= e) > 0L || (e > 0L && q.size() < QUEUE_LOW_LIMIT)) {
if (needRead) {
needRead = false;
channel.config()
parent.channel().config()
.setAutoRead(true);
}
}
else if (!needRead) {
needRead = true;
channel.config()
parent.channel().config()
.setAutoRead(false);
}

Expand All @@ -358,7 +356,7 @@ else if (!needRead) {
final void onInboundNext(Object msg) {
if (inboundDone || isCancelled()) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg));
log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg));
}
ReferenceCountUtil.release(msg);
return;
Expand All @@ -368,11 +366,11 @@ final void onInboundNext(Object msg) {
try {
if (logLeakDetection.isDebugEnabled()) {
if (msg instanceof ByteBuf) {
((ByteBuf) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() +
((ByteBuf) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() +
" will handle the message from this point"));
}
else if (msg instanceof ByteBufHolder) {
((ByteBufHolder) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() +
((ByteBufHolder) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() +
" will handle the message from this point"));
}
}
Expand All @@ -393,10 +391,10 @@ else if (msg instanceof ByteBufHolder) {
}
if (logLeakDetection.isDebugEnabled()) {
if (msg instanceof ByteBuf) {
((ByteBuf) msg).touch(format(channel, "Buffered ByteBuf in the inbound buffer queue"));
((ByteBuf) msg).touch(format(parent.channel(), "Buffered ByteBuf in the inbound buffer queue"));
}
else if (msg instanceof ByteBufHolder) {
((ByteBufHolder) msg).touch(format(channel, "Buffered ByteBufHolder in the inbound buffer queue"));
((ByteBufHolder) msg).touch(format(parent.channel(), "Buffered ByteBufHolder in the inbound buffer queue"));
}
}
q.offer(msg);
Expand All @@ -423,20 +421,20 @@ final void onInboundError(Throwable err) {
if (isCancelled() || inboundDone) {
if (log.isDebugEnabled()) {
if (AbortedException.isConnectionReset(err)) {
log.debug(format(channel, "Connection reset has been observed post termination"), err);
log.debug(format(parent.channel(), "Connection reset has been observed post termination"), err);
}
else {
log.warn(format(channel, "An exception has been observed post termination"), err);
log.warn(format(parent.channel(), "An exception has been observed post termination"), err);
}
}
else if (log.isWarnEnabled() && !AbortedException.isConnectionReset(err)) {
log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString());
log.warn(format(parent.channel(), "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString());
}
return;
}
CoreSubscriber<?> receiver = this.receiver;
this.inboundDone = true;
if (channel.isActive()) {
if (parent.channel().isActive()) {
parent.markPersistent(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package reactor.netty.http.client;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand Down Expand Up @@ -106,6 +107,7 @@
import reactor.netty.ByteBufMono;
import reactor.netty.CancelReceiverHandlerTest;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.LogTracker;
import reactor.netty.NettyPipeline;
Expand Down Expand Up @@ -3401,4 +3403,46 @@ static void testIssue3285SendRequest(HttpClient client, @Nullable Class<? extend
.verify(Duration.ofSeconds(5));
}
}

@Test
void testIssue3416() {
disposableServer =
createServer()
.route(r -> r.get("/", (req, res) -> res.sendString(Mono.just("testIssue3416")))
.ws("/ws", (in, out) -> out.neverComplete()))
.bindNow();

AtomicReference<WeakReference<Connection>> connWeakRef = new AtomicReference<>();
HttpClient client =
createClient(disposableServer.port())
.observe((conn, state) -> {
if (state == ConnectionObserver.State.CONNECTED) {
connWeakRef.compareAndSet(null, new WeakReference<>(conn));
}
});

client.get()
.uri("/")
.response() // Reactor Netty will close the connection
.flatMap(res ->
client.websocket()
.uri("/ws")
.handle((in, out) ->
Flux.range(0, 10)
.delayElements(Duration.ofMillis(100))
.skipUntil(l -> {
boolean result = connWeakRef.get().get() == null;
if (!result) {
System.gc();
}
return result;
})
.switchIfEmpty(Mono.error(new RuntimeException("failed"))
.flatMap(l -> Mono.empty())))
.then()
.contextWrite(Context.of(res.getClass(), res)))
.as(StepVerifier::create)
.expectComplete()
.verify(Duration.ofSeconds(500));
}
}

0 comments on commit c1741b8

Please sign in to comment.