From 395377f181b11c47e6e86286739d2a5fe4cc6cc2 Mon Sep 17 00:00:00 2001 From: Filip Hrisafov Date: Fri, 9 Feb 2024 09:46:32 +0100 Subject: [PATCH] Copy the Spring ReactorClientHttpConnector and adapt it to be used without spring-context --- .../FlowableReactorClientHttpConnector.java | 106 ++++++++++ .../FlowableReactorClientHttpRequest.java | 146 ++++++++++++++ .../FlowableReactorClientHttpResponse.java | 186 ++++++++++++++++++ .../SpringWebClientFlowableHttpClient.java | 3 +- 4 files changed, 439 insertions(+), 2 deletions(-) create mode 100644 modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpConnector.java create mode 100644 modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpRequest.java create mode 100644 modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpResponse.java diff --git a/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpConnector.java b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpConnector.java new file mode 100644 index 00000000000..fa9dc5766da --- /dev/null +++ b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpConnector.java @@ -0,0 +1,106 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.flowable.http.common.impl.spring.reactive; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.springframework.http.HttpMethod; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.ClientHttpRequest; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.util.Assert; + +import reactor.core.publisher.Mono; +import reactor.netty.NettyOutbound; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientRequest; + +// This is a copy of org.springframework.http.client.reactive.ReactorClientHttpConnector +// the name has been changed to indicate that this is an adaptation of the Spring ReactorClientHttpConnector +// The difference is the removal of the SmartLifecycle and the not used methods by Flowable. +// The reason for the adaptation is due to https://github.com/spring-projects/spring-framework/issues/31180#issuecomment-1934453468 + +/** + * Reactor-Netty implementation of {@link ClientHttpConnector}. + * + * @author Brian Clozel + * @author Rossen Stoyanchev + * @author Sebastien Deleuze + * @see reactor.netty.http.client.HttpClient + * @since 5.0 + */ +class FlowableReactorClientHttpConnector implements ClientHttpConnector { + + private final HttpClient httpClient; + + /** + * Constructor with a pre-configured {@code HttpClient} instance. + * + * @param httpClient the client to use + * @since 5.1 + */ + public FlowableReactorClientHttpConnector(HttpClient httpClient) { + Assert.notNull(httpClient, "HttpClient is required"); + this.httpClient = httpClient; + } + + @Override + public Mono connect(HttpMethod method, URI uri, + Function> requestCallback) { + + AtomicReference responseRef = new AtomicReference<>(); + + HttpClient.RequestSender requestSender = this.httpClient + .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())); + + requestSender = setUri(requestSender, uri); + + return requestSender + .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) + .responseConnection((response, connection) -> { + responseRef.set(new FlowableReactorClientHttpResponse(response, connection)); + return Mono.just((ClientHttpResponse) responseRef.get()); + }) + .next() + .doOnCancel(() -> { + FlowableReactorClientHttpResponse response = responseRef.get(); + if (response != null) { + response.releaseAfterCancel(method); + } + }); + } + + private static HttpClient.RequestSender setUri(HttpClient.RequestSender requestSender, URI uri) { + if (uri.isAbsolute()) { + try { + return requestSender.uri(uri); + } catch (Exception ex) { + // Fall back on passing it in as a String + } + } + return requestSender.uri(uri.toString()); + } + + private FlowableReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, + NettyOutbound nettyOutbound) { + + return new FlowableReactorClientHttpRequest(method, uri, request, nettyOutbound); + } + +} diff --git a/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpRequest.java b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpRequest.java new file mode 100644 index 00000000000..e589b400244 --- /dev/null +++ b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpRequest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.flowable.http.common.impl.spring.reactive; + +import java.net.URI; +import java.nio.file.Path; + +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ZeroCopyHttpOutputMessage; +import org.springframework.http.client.reactive.AbstractClientHttpRequest; +import org.springframework.http.support.Netty4HeadersAdapter; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.cookie.DefaultCookie; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.NettyOutbound; +import reactor.netty.http.client.HttpClientRequest; + +// This is a copy of org.springframework.http.client.reactive.ReactorClientHttpRequest +// the name has been changed to indicate that this is an adaptation of the Spring ReactorClientHttpRequest +// This is needed since the Spring class is package protected, and we needed a custom FlowableReactorClientHttpConnector. +// See comments in for FlowableReactorClientHttpConnector more information + +/** + * {@link org.springframework.http.client.reactive.ClientHttpRequest ClientHttpRequest} implementation for the Reactor-Netty HTTP client. + * + * @author Brian Clozel + * @author Rossen Stoyanchev + * @see reactor.netty.http.client.HttpClient + * @since 5.0 + */ +class FlowableReactorClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage { + + private final HttpMethod httpMethod; + + private final URI uri; + + private final HttpClientRequest request; + + private final NettyOutbound outbound; + + private final NettyDataBufferFactory bufferFactory; + + public FlowableReactorClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) { + this.httpMethod = method; + this.uri = uri; + this.request = request; + this.outbound = outbound; + this.bufferFactory = new NettyDataBufferFactory(outbound.alloc()); + } + + @Override + public HttpMethod getMethod() { + return this.httpMethod; + } + + @Override + public URI getURI() { + return this.uri; + } + + @Override + public DataBufferFactory bufferFactory() { + return this.bufferFactory; + } + + @Override + @SuppressWarnings("unchecked") + public T getNativeRequest() { + return (T) this.request; + } + + @Override + public Mono writeWith(Publisher body) { + return doCommit(() -> { + // Send as Mono if possible as an optimization hint to Reactor Netty + if (body instanceof Mono) { + Mono byteBufMono = Mono.from(body).map(NettyDataBufferFactory::toByteBuf); + return this.outbound.send(byteBufMono).then(); + + } else { + Flux byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf); + return this.outbound.send(byteBufFlux).then(); + } + }); + } + + @Override + public Mono writeAndFlushWith(Publisher> body) { + Publisher> byteBufs = Flux.from(body).map(FlowableReactorClientHttpRequest::toByteBufs); + return doCommit(() -> this.outbound.sendGroups(byteBufs).then()); + } + + private static Publisher toByteBufs(Publisher dataBuffers) { + return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf); + } + + @Override + public Mono writeWith(Path file, long position, long count) { + return doCommit(() -> this.outbound.sendFile(file, position, count).then()); + } + + @Override + public Mono setComplete() { + return doCommit(this.outbound::then); + } + + @Override + protected void applyHeaders() { + getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value)); + } + + @Override + protected void applyCookies() { + getCookies().values().forEach(values -> values.forEach(value -> { + DefaultCookie cookie = new DefaultCookie(value.getName(), value.getValue()); + this.request.addCookie(cookie); + })); + } + + @Override + protected HttpHeaders initReadOnlyHeaders() { + return HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(this.request.requestHeaders())); + } + +} diff --git a/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpResponse.java b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpResponse.java new file mode 100644 index 00000000000..d0edb91ced1 --- /dev/null +++ b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/FlowableReactorClientHttpResponse.java @@ -0,0 +1,186 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.flowable.http.common.impl.spring.reactive; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ResponseCookie; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.support.Netty4HeadersAdapter; +import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.ObjectUtils; + +import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http.cookie.DefaultCookie; +import reactor.core.publisher.Flux; +import reactor.netty.ChannelOperationsId; +import reactor.netty.Connection; +import reactor.netty.NettyInbound; +import reactor.netty.http.client.HttpClientResponse; + +// This is a copy of org.springframework.http.client.reactive.ReactorClientHttpResponse +// the name has been changed to indicate that this is an adaptation of the Spring ReactorClientHttpResponse +// This is needed since the Spring class is package protected, and we needed a custom FlowableReactorClientHttpConnector. +// See comments in for FlowableReactorClientHttpConnector more information + +/** + * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client. + * + * @author Brian Clozel + * @author Rossen Stoyanchev + * @see reactor.netty.http.client.HttpClient + * @since 5.0 + */ +class FlowableReactorClientHttpResponse implements ClientHttpResponse { + + private static final Log logger = LogFactory.getLog(FlowableReactorClientHttpResponse.class); + + private final HttpClientResponse response; + + private final HttpHeaders headers; + + private final NettyInbound inbound; + + private final NettyDataBufferFactory bufferFactory; + + // 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe) + private final AtomicInteger state = new AtomicInteger(); + + /** + * Constructor that matches the inputs from + * {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}. + * + * @since 5.2.8 + */ + public FlowableReactorClientHttpResponse(HttpClientResponse response, Connection connection) { + this.response = response; + MultiValueMap adapter = new Netty4HeadersAdapter(response.responseHeaders()); + this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); + this.inbound = connection.inbound(); + this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc()); + } + + @Override + public String getId() { + String id = null; + if (this.response instanceof ChannelOperationsId operationsId) { + id = (logger.isDebugEnabled() ? operationsId.asLongText() : operationsId.asShortText()); + } + if (id == null && this.response instanceof Connection connection) { + id = connection.channel().id().asShortText(); + } + return (id != null ? id : ObjectUtils.getIdentityHexString(this)); + } + + @Override + public Flux getBody() { + return this.inbound.receive() + .doOnSubscribe(s -> { + if (this.state.compareAndSet(0, 1)) { + return; + } + if (this.state.get() == 2) { + throw new IllegalStateException( + "The client response body has been released already due to cancellation."); + } + }) + .map(byteBuf -> { + byteBuf.retain(); + return this.bufferFactory.wrap(byteBuf); + }); + } + + @Override + public HttpHeaders getHeaders() { + return this.headers; + } + + @Override + public HttpStatusCode getStatusCode() { + return HttpStatusCode.valueOf(this.response.status().code()); + } + + @Override + public MultiValueMap getCookies() { + MultiValueMap result = new LinkedMultiValueMap<>(); + this.response.cookies().values().stream() + .flatMap(Collection::stream) + .forEach(cookie -> result.add(cookie.name(), + ResponseCookie.fromClientResponse(cookie.name(), cookie.value()) + .domain(cookie.domain()) + .path(cookie.path()) + .maxAge(cookie.maxAge()) + .secure(cookie.isSecure()) + .httpOnly(cookie.isHttpOnly()) + .sameSite(getSameSite(cookie)) + .build())); + return CollectionUtils.unmodifiableMultiValueMap(result); + } + + @Nullable + private static String getSameSite(Cookie cookie) { + if (cookie instanceof DefaultCookie defaultCookie && defaultCookie.sameSite() != null) { + return defaultCookie.sameSite().name(); + } + return null; + } + + /** + * Called by {@link FlowableReactorClientHttpConnector} when a cancellation is detected + * but the content has not been subscribed to. If the subscription never + * materializes then the content will remain not drained. Or it could still + * materialize if the cancellation happened very early, or the response + * reading was delayed for some reason. + */ + void releaseAfterCancel(HttpMethod method) { + if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) { + if (logger.isDebugEnabled()) { + logger.debug("[" + getId() + "]" + "Releasing body, not yet subscribed."); + } + this.inbound.receive().doOnNext(byteBuf -> { + }).subscribe(byteBuf -> { + }, ex -> { + }); + } + } + + private boolean mayHaveBody(HttpMethod method) { + int code = getStatusCode().value(); + return !((code >= 100 && code < 200) || code == 204 || code == 205 || + method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0); + } + + @Override + public String toString() { + return "FlowableReactorClientHttpResponse{" + + "request=[" + this.response.method().name() + " " + this.response.uri() + "]," + + "status=" + getStatusCode() + '}'; + } + +} diff --git a/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/SpringWebClientFlowableHttpClient.java b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/SpringWebClientFlowableHttpClient.java index 559aae82bd3..e9db263b052 100644 --- a/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/SpringWebClientFlowableHttpClient.java +++ b/modules/flowable-http-common/src/main/java/org/flowable/http/common/impl/spring/reactive/SpringWebClientFlowableHttpClient.java @@ -38,7 +38,6 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.client.MultipartBodyBuilder; -import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; @@ -90,7 +89,7 @@ public SpringWebClientFlowableHttpClient(HttpClientConfig config) { .addHandlerLast(new ReadTimeoutHandler(config.getSocketTimeout(), TimeUnit.MILLISECONDS))); WebClient.Builder webClientBuilder = WebClient.builder(); - webClientBuilder = webClientBuilder.clientConnector(new ReactorClientHttpConnector(httpClient)); + webClientBuilder = webClientBuilder.clientConnector(new FlowableReactorClientHttpConnector(httpClient)); this.webClient = webClientBuilder.build(); this.initialRequestTimeout = Duration.ofMillis(config.getSocketTimeout());