diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java index 165ed1f31e..6f2cf1bef6 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java @@ -25,9 +25,16 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.EventLoop; import io.netty.util.ReferenceCounted; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -123,9 +130,9 @@ else if (recorder instanceof ContextAwareChannelMetricsRecorder) { .as(ChannelOperations.class); } - final Connection connection; + Connection connection; final FluxReceive inbound; - final ConnectionObserver listener; + ConnectionObserver listener; final Sinks.Empty onTerminate; volatile Subscription outboundSubscription; @@ -502,6 +509,8 @@ protected final void terminate() { // and it is guarded by rebind(connection), so tryEmitEmpty() should happen just once onTerminate.tryEmitEmpty(); listener.onStateChange(this, ConnectionObserver.State.DISCONNECTING); + connection = new DisposedConnection(channel()); + listener = ConnectionObserver.emptyListener(); } } @@ -681,4 +690,106 @@ static OnSetup empty() { Subscription.class, "outboundSubscription"); + static final class DisposedChannel extends AbstractChannel { + + final DefaultChannelConfig config; + final SocketAddress localAddress; + final ChannelMetadata metadata; + final SocketAddress remoteAddress; + + DisposedChannel(Channel copy) { + super(null); + this.metadata = copy.metadata(); + this.config = new DefaultChannelConfig(this); + this.localAddress = copy.localAddress(); + this.remoteAddress = copy.remoteAddress(); + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + protected void doBeginRead() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBind(SocketAddress socketAddress) { + throw new UnsupportedOperationException(); + } + + @Override + protected void doClose() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doDisconnect() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isActive() { + return false; + } + + @Override + protected boolean isCompatible(EventLoop eventLoop) { + return false; + } + + @Override + public boolean isOpen() { + return false; + } + + @Override + protected SocketAddress localAddress0() { + return localAddress; + } + + @Override + public ChannelMetadata metadata() { + return metadata; + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new DisposedChannelUnsafe(); + } + + @Override + protected SocketAddress remoteAddress0() { + return remoteAddress; + } + + final class DisposedChannelUnsafe extends AbstractUnsafe { + + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + promise.setFailure(new UnsupportedOperationException()); + } + } + } + + static final class DisposedConnection implements Connection { + + final Channel channel; + + DisposedConnection(Channel copy) { + this.channel = new DisposedChannel(copy); + } + + @Override + public Channel channel() { + return channel; + } + } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java b/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java index 15fc027c6d..b6dde9a017 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,6 @@ final class FluxReceive extends Flux implements Subscription, Disposable static final int QUEUE_LOW_LIMIT = 32; - final Channel channel; final ChannelOperations parent; final EventLoop eventLoop; @@ -78,9 +77,8 @@ final class FluxReceive extends Flux implements Subscription, Disposable //reset channel to manual read if re-used this.parent = parent; - this.channel = parent.channel(); - this.eventLoop = channel.eventLoop(); - channel.config() + this.eventLoop = parent.channel().eventLoop(); + parent.channel().config() .setAutoRead(false); CANCEL.lazySet(this, (state) -> { if (eventLoop.inEventLoop()) { @@ -155,7 +153,7 @@ final void startReceiver(CoreSubscriber s) { if (!subscribedOnce) { subscribedOnce = true; if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: subscribing inbound receiver"), this); + log.debug(format(parent.channel(), "{}: subscribing inbound receiver"), this); } if ((inboundDone && getPending() == 0) || isCancelled()) { if (inboundError != null) { @@ -182,7 +180,7 @@ final void startReceiver(CoreSubscriber s) { } else { if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: Rejecting additional inbound receiver."), this); + log.debug(format(parent.channel(), "{}: Rejecting additional inbound receiver."), this); } String msg = "Rejecting additional inbound receiver. State=" + toString(false); @@ -218,7 +216,7 @@ final void cleanQueue(@Nullable Queue q) { Object o; while ((o = q.poll()) != null) { if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(o)); + log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(o)); } ReferenceCountUtil.release(o); } @@ -283,11 +281,11 @@ final void drainReceiver() { try { if (logLeakDetection.isDebugEnabled()) { if (v instanceof ByteBuf) { - ((ByteBuf) v).touch(format(channel, "Receiver " + a.getClass().getName() + + ((ByteBuf) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() + " will handle the message from this point")); } else if (v instanceof ByteBufHolder) { - ((ByteBufHolder) v).touch(format(channel, "Receiver " + a.getClass().getName() + + ((ByteBufHolder) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() + " will handle the message from this point")); } } @@ -322,7 +320,7 @@ else if (v instanceof ByteBufHolder) { receiverFastpath = true; if (needRead) { needRead = false; - channel.config() + parent.channel().config() .setAutoRead(true); } //CHECKSTYLE:OFF @@ -336,13 +334,13 @@ else if (v instanceof ByteBufHolder) { if ((receiverDemand -= e) > 0L || (e > 0L && q.size() < QUEUE_LOW_LIMIT)) { if (needRead) { needRead = false; - channel.config() + parent.channel().config() .setAutoRead(true); } } else if (!needRead) { needRead = true; - channel.config() + parent.channel().config() .setAutoRead(false); } @@ -358,7 +356,7 @@ else if (!needRead) { final void onInboundNext(Object msg) { if (inboundDone || isCancelled()) { if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg)); + log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg)); } ReferenceCountUtil.release(msg); return; @@ -368,11 +366,11 @@ final void onInboundNext(Object msg) { try { if (logLeakDetection.isDebugEnabled()) { if (msg instanceof ByteBuf) { - ((ByteBuf) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() + + ((ByteBuf) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() + " will handle the message from this point")); } else if (msg instanceof ByteBufHolder) { - ((ByteBufHolder) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() + + ((ByteBufHolder) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() + " will handle the message from this point")); } } @@ -393,10 +391,10 @@ else if (msg instanceof ByteBufHolder) { } if (logLeakDetection.isDebugEnabled()) { if (msg instanceof ByteBuf) { - ((ByteBuf) msg).touch(format(channel, "Buffered ByteBuf in the inbound buffer queue")); + ((ByteBuf) msg).touch(format(parent.channel(), "Buffered ByteBuf in the inbound buffer queue")); } else if (msg instanceof ByteBufHolder) { - ((ByteBufHolder) msg).touch(format(channel, "Buffered ByteBufHolder in the inbound buffer queue")); + ((ByteBufHolder) msg).touch(format(parent.channel(), "Buffered ByteBufHolder in the inbound buffer queue")); } } q.offer(msg); @@ -423,20 +421,20 @@ final void onInboundError(Throwable err) { if (isCancelled() || inboundDone) { if (log.isDebugEnabled()) { if (AbortedException.isConnectionReset(err)) { - log.debug(format(channel, "Connection reset has been observed post termination"), err); + log.debug(format(parent.channel(), "Connection reset has been observed post termination"), err); } else { - log.warn(format(channel, "An exception has been observed post termination"), err); + log.warn(format(parent.channel(), "An exception has been observed post termination"), err); } } else if (log.isWarnEnabled() && !AbortedException.isConnectionReset(err)) { - log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString()); + log.warn(format(parent.channel(), "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString()); } return; } CoreSubscriber receiver = this.receiver; this.inboundDone = true; - if (channel.isActive()) { + if (parent.channel().isActive()) { parent.markPersistent(false); } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 5fff508aba..4fa60dcc8f 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -16,6 +16,7 @@ package reactor.netty.http.client; import java.io.IOException; +import java.lang.ref.WeakReference; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -106,6 +107,7 @@ import reactor.netty.ByteBufMono; import reactor.netty.CancelReceiverHandlerTest; import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; import reactor.netty.FutureMono; import reactor.netty.LogTracker; import reactor.netty.NettyPipeline; @@ -3401,4 +3403,46 @@ static void testIssue3285SendRequest(HttpClient client, @Nullable Class r.get("/", (req, res) -> res.sendString(Mono.just("testIssue3416"))) + .ws("/ws", (in, out) -> out.neverComplete())) + .bindNow(); + + AtomicReference> connWeakRef = new AtomicReference<>(); + HttpClient client = + createClient(disposableServer.port()) + .observe((conn, state) -> { + if (state == ConnectionObserver.State.CONNECTED) { + connWeakRef.compareAndSet(null, new WeakReference<>(conn)); + } + }); + + client.get() + .uri("/") + .response() // Reactor Netty will close the connection + .flatMap(res -> + client.websocket() + .uri("/ws") + .handle((in, out) -> + Flux.range(0, 10) + .delayElements(Duration.ofMillis(100)) + .skipUntil(l -> { + boolean result = connWeakRef.get().get() == null; + if (!result) { + System.gc(); + } + return result; + }) + .switchIfEmpty(Mono.error(new RuntimeException("failed")) + .flatMap(l -> Mono.empty()))) + .then() + .contextWrite(Context.of(res.getClass(), res))) + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(500)); + } }