From 04ec2b855ea27d53cbb67dca7bf38b88bed18e61 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 3 Oct 2024 14:19:05 +0300 Subject: [PATCH 1/5] When terminating detach the connection from request/response objects Related to #3416, #3367 --- .../netty/channel/ChannelOperations.java | 115 +++++++++++++++++- .../reactor/netty/channel/FluxReceive.java | 42 +++---- .../netty/http/client/HttpClientTest.java | 44 +++++++ 3 files changed, 177 insertions(+), 24 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java index 165ed1f31e..6f2cf1bef6 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java @@ -25,9 +25,16 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.EventLoop; import io.netty.util.ReferenceCounted; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -123,9 +130,9 @@ else if (recorder instanceof ContextAwareChannelMetricsRecorder) { .as(ChannelOperations.class); } - final Connection connection; + Connection connection; final FluxReceive inbound; - final ConnectionObserver listener; + ConnectionObserver listener; final Sinks.Empty onTerminate; volatile Subscription outboundSubscription; @@ -502,6 +509,8 @@ protected final void terminate() { // and it is guarded by rebind(connection), so tryEmitEmpty() should happen just once onTerminate.tryEmitEmpty(); listener.onStateChange(this, ConnectionObserver.State.DISCONNECTING); + connection = new DisposedConnection(channel()); + listener = ConnectionObserver.emptyListener(); } } @@ -681,4 +690,106 @@ static OnSetup empty() { Subscription.class, "outboundSubscription"); + static final class DisposedChannel extends AbstractChannel { + + final DefaultChannelConfig config; + final SocketAddress localAddress; + final ChannelMetadata metadata; + final SocketAddress remoteAddress; + + DisposedChannel(Channel copy) { + super(null); + this.metadata = copy.metadata(); + this.config = new DefaultChannelConfig(this); + this.localAddress = copy.localAddress(); + this.remoteAddress = copy.remoteAddress(); + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + protected void doBeginRead() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBind(SocketAddress socketAddress) { + throw new UnsupportedOperationException(); + } + + @Override + protected void doClose() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doDisconnect() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isActive() { + return false; + } + + @Override + protected boolean isCompatible(EventLoop eventLoop) { + return false; + } + + @Override + public boolean isOpen() { + return false; + } + + @Override + protected SocketAddress localAddress0() { + return localAddress; + } + + @Override + public ChannelMetadata metadata() { + return metadata; + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new DisposedChannelUnsafe(); + } + + @Override + protected SocketAddress remoteAddress0() { + return remoteAddress; + } + + final class DisposedChannelUnsafe extends AbstractUnsafe { + + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + promise.setFailure(new UnsupportedOperationException()); + } + } + } + + static final class DisposedConnection implements Connection { + + final Channel channel; + + DisposedConnection(Channel copy) { + this.channel = new DisposedChannel(copy); + } + + @Override + public Channel channel() { + return channel; + } + } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java b/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java index 15fc027c6d..b6dde9a017 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,6 @@ final class FluxReceive extends Flux implements Subscription, Disposable static final int QUEUE_LOW_LIMIT = 32; - final Channel channel; final ChannelOperations parent; final EventLoop eventLoop; @@ -78,9 +77,8 @@ final class FluxReceive extends Flux implements Subscription, Disposable //reset channel to manual read if re-used this.parent = parent; - this.channel = parent.channel(); - this.eventLoop = channel.eventLoop(); - channel.config() + this.eventLoop = parent.channel().eventLoop(); + parent.channel().config() .setAutoRead(false); CANCEL.lazySet(this, (state) -> { if (eventLoop.inEventLoop()) { @@ -155,7 +153,7 @@ final void startReceiver(CoreSubscriber s) { if (!subscribedOnce) { subscribedOnce = true; if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: subscribing inbound receiver"), this); + log.debug(format(parent.channel(), "{}: subscribing inbound receiver"), this); } if ((inboundDone && getPending() == 0) || isCancelled()) { if (inboundError != null) { @@ -182,7 +180,7 @@ final void startReceiver(CoreSubscriber s) { } else { if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: Rejecting additional inbound receiver."), this); + log.debug(format(parent.channel(), "{}: Rejecting additional inbound receiver."), this); } String msg = "Rejecting additional inbound receiver. State=" + toString(false); @@ -218,7 +216,7 @@ final void cleanQueue(@Nullable Queue q) { Object o; while ((o = q.poll()) != null) { if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(o)); + log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(o)); } ReferenceCountUtil.release(o); } @@ -283,11 +281,11 @@ final void drainReceiver() { try { if (logLeakDetection.isDebugEnabled()) { if (v instanceof ByteBuf) { - ((ByteBuf) v).touch(format(channel, "Receiver " + a.getClass().getName() + + ((ByteBuf) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() + " will handle the message from this point")); } else if (v instanceof ByteBufHolder) { - ((ByteBufHolder) v).touch(format(channel, "Receiver " + a.getClass().getName() + + ((ByteBufHolder) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() + " will handle the message from this point")); } } @@ -322,7 +320,7 @@ else if (v instanceof ByteBufHolder) { receiverFastpath = true; if (needRead) { needRead = false; - channel.config() + parent.channel().config() .setAutoRead(true); } //CHECKSTYLE:OFF @@ -336,13 +334,13 @@ else if (v instanceof ByteBufHolder) { if ((receiverDemand -= e) > 0L || (e > 0L && q.size() < QUEUE_LOW_LIMIT)) { if (needRead) { needRead = false; - channel.config() + parent.channel().config() .setAutoRead(true); } } else if (!needRead) { needRead = true; - channel.config() + parent.channel().config() .setAutoRead(false); } @@ -358,7 +356,7 @@ else if (!needRead) { final void onInboundNext(Object msg) { if (inboundDone || isCancelled()) { if (log.isDebugEnabled()) { - log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg)); + log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg)); } ReferenceCountUtil.release(msg); return; @@ -368,11 +366,11 @@ final void onInboundNext(Object msg) { try { if (logLeakDetection.isDebugEnabled()) { if (msg instanceof ByteBuf) { - ((ByteBuf) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() + + ((ByteBuf) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() + " will handle the message from this point")); } else if (msg instanceof ByteBufHolder) { - ((ByteBufHolder) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() + + ((ByteBufHolder) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() + " will handle the message from this point")); } } @@ -393,10 +391,10 @@ else if (msg instanceof ByteBufHolder) { } if (logLeakDetection.isDebugEnabled()) { if (msg instanceof ByteBuf) { - ((ByteBuf) msg).touch(format(channel, "Buffered ByteBuf in the inbound buffer queue")); + ((ByteBuf) msg).touch(format(parent.channel(), "Buffered ByteBuf in the inbound buffer queue")); } else if (msg instanceof ByteBufHolder) { - ((ByteBufHolder) msg).touch(format(channel, "Buffered ByteBufHolder in the inbound buffer queue")); + ((ByteBufHolder) msg).touch(format(parent.channel(), "Buffered ByteBufHolder in the inbound buffer queue")); } } q.offer(msg); @@ -423,20 +421,20 @@ final void onInboundError(Throwable err) { if (isCancelled() || inboundDone) { if (log.isDebugEnabled()) { if (AbortedException.isConnectionReset(err)) { - log.debug(format(channel, "Connection reset has been observed post termination"), err); + log.debug(format(parent.channel(), "Connection reset has been observed post termination"), err); } else { - log.warn(format(channel, "An exception has been observed post termination"), err); + log.warn(format(parent.channel(), "An exception has been observed post termination"), err); } } else if (log.isWarnEnabled() && !AbortedException.isConnectionReset(err)) { - log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString()); + log.warn(format(parent.channel(), "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString()); } return; } CoreSubscriber receiver = this.receiver; this.inboundDone = true; - if (channel.isActive()) { + if (parent.channel().isActive()) { parent.markPersistent(false); } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 5fff508aba..8be23fc79e 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -16,6 +16,7 @@ package reactor.netty.http.client; import java.io.IOException; +import java.lang.ref.WeakReference; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -106,6 +107,7 @@ import reactor.netty.ByteBufMono; import reactor.netty.CancelReceiverHandlerTest; import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; import reactor.netty.FutureMono; import reactor.netty.LogTracker; import reactor.netty.NettyPipeline; @@ -3401,4 +3403,46 @@ static void testIssue3285SendRequest(HttpClient client, @Nullable Class r.get("/", (req, res) -> res.sendString(Mono.just("testIssue3416"))) + .ws("/ws", (in, out) -> out.neverComplete())) + .bindNow(); + + AtomicReference> connWeakRef = new AtomicReference<>(); + HttpClient client = + createClient(disposableServer.port()) + .observe((conn, state) -> { + if (state == ConnectionObserver.State.CONNECTED) { + connWeakRef.compareAndSet(null, new WeakReference<>(conn)); + } + }); + + client.get() + .uri("/") + .response() // Reactor Netty will close the connection + .flatMap(res -> + client.websocket() + .uri("/ws") + .handle((in, out) -> + Flux.range(0, 10) + .delayElements(Duration.ofMillis(100)) + .skipUntil(l -> { + boolean result = connWeakRef.get().get() == null; + if (!result) { + System.gc(); + } + return result; + }) + .switchIfEmpty(Mono.error(new RuntimeException("failed")) + .flatMap(l -> Mono.empty()))) + .then() + .contextWrite(Context.of(res.getClass(), res))) + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } } From 14c96a4c175df2b0439dcf81e09a159bb74d1afb Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 7 Oct 2024 21:31:10 +0300 Subject: [PATCH 2/5] Move registration to terminate event outside of doFinally when we have the real connection Obtain the event loop outside of doFinally when we have the real connection --- .../brave/TracingHttpServerDecorator.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java b/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java index aade901a09..87bc346173 100644 --- a/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java +++ b/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -266,6 +266,12 @@ static final class TracingMapHandle implements BiFunction, Connection public Mono apply(Mono voidMono, Connection connection) { HttpServerRequest braveRequest = connection.channel().attr(REQUEST_ATTR_KEY).get(); Span span = connection.channel().attr(SPAN_ATTR_KEY).get(); + connection.onTerminate() + .subscribe( + null, + t -> cleanup(connection.channel()), + () -> cleanup(connection.channel())); + EventLoop eventLoop = connection.channel().eventLoop(); return voidMono.doFinally(sig -> { if (braveRequest.unwrap() instanceof reactor.netty.http.server.HttpServerResponse) { reactor.netty.http.server.HttpServerResponse response = @@ -273,20 +279,12 @@ public Mono apply(Mono voidMono, Connection connection) { Span localSpan = sig == SignalType.CANCEL ? span.annotate("cancel") : span; HttpServerResponse braveResponse = new DelegatingHttpResponse(response, braveRequest, throwable); - response.withConnection(conn -> { - conn.onTerminate() - .subscribe( - null, - t -> cleanup(connection.channel()), - () -> cleanup(connection.channel())); - EventLoop eventLoop = conn.channel().eventLoop(); - if (eventLoop.inEventLoop()) { - handler.handleSend(braveResponse, localSpan); - } - else { - eventLoop.execute(() -> handler.handleSend(braveResponse, localSpan)); - } - }); + if (eventLoop.inEventLoop()) { + handler.handleSend(braveResponse, localSpan); + } + else { + eventLoop.execute(() -> handler.handleSend(braveResponse, localSpan)); + } } }) .doOnError(this::throwable) From aa4cf84e3b38b10a570c257c21ba627a90fdf84d Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 7 Oct 2024 22:17:19 +0300 Subject: [PATCH 3/5] CloseFuture is always indicating that the channel is closed --- .../main/java/reactor/netty/channel/ChannelOperations.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java index 6f2cf1bef6..bdbb99389e 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java @@ -28,6 +28,7 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelMetadata; @@ -705,6 +706,11 @@ static final class DisposedChannel extends AbstractChannel { this.remoteAddress = copy.remoteAddress(); } + @Override + public ChannelFuture closeFuture() { + return newSucceededFuture(); + } + @Override public ChannelConfig config() { return config; From b7aa7a4db74f8ce6e175483976b12cd9768ff2cd Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 9 Oct 2024 15:56:36 +0300 Subject: [PATCH 4/5] Dispose the connection only if it is not disposed --- .../main/java/reactor/netty/channel/ChannelOperations.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java index bdbb99389e..4a866fbd25 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java @@ -214,7 +214,9 @@ public void dispose() { if (!inbound.isDisposed()) { discard(); } - connection.dispose(); + if (!connection.isDisposed()) { + connection.dispose(); + } } @Override From 26b7bd70cbd2124753d42b3ce6f4eab9ac1f6089 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 17 Oct 2024 09:51:38 +0300 Subject: [PATCH 5/5] Address feedback --- .../reactor/netty/channel/ChannelOperations.java | 12 ++++++------ .../netty/http/brave/TracingHttpServerDecorator.java | 2 ++ .../reactor/netty/http/client/HttpClientTest.java | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java index 4a866fbd25..8ae1cfca30 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java @@ -700,12 +700,12 @@ static final class DisposedChannel extends AbstractChannel { final ChannelMetadata metadata; final SocketAddress remoteAddress; - DisposedChannel(Channel copy) { + DisposedChannel(Channel actual) { super(null); - this.metadata = copy.metadata(); + this.metadata = actual.metadata(); this.config = new DefaultChannelConfig(this); - this.localAddress = copy.localAddress(); - this.remoteAddress = copy.remoteAddress(); + this.localAddress = actual.localAddress(); + this.remoteAddress = actual.remoteAddress(); } @Override @@ -791,8 +791,8 @@ static final class DisposedConnection implements Connection { final Channel channel; - DisposedConnection(Channel copy) { - this.channel = new DisposedChannel(copy); + DisposedConnection(Channel actual) { + this.channel = new DisposedChannel(actual); } @Override diff --git a/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java b/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java index 87bc346173..625b59bd31 100644 --- a/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java +++ b/reactor-netty-http-brave/src/main/java/reactor/netty/http/brave/TracingHttpServerDecorator.java @@ -271,6 +271,8 @@ public Mono apply(Mono voidMono, Connection connection) { null, t -> cleanup(connection.channel()), () -> cleanup(connection.channel())); + // At the point of doFinally the connection might be disposed and there might be no event loop + // associated with the disposed connection EventLoop eventLoop = connection.channel().eventLoop(); return voidMono.doFinally(sig -> { if (braveRequest.unwrap() instanceof reactor.netty.http.server.HttpServerResponse) { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 8be23fc79e..9a6e8170f8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -3424,7 +3424,7 @@ void testIssue3416() { client.get() .uri("/") .response() // Reactor Netty will close the connection - .flatMap(res -> + .flatMap(res -> // Keep response object alive and at the same time check that the real connection can be GCed client.websocket() .uri("/ws") .handle((in, out) ->