Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize to support spring webflux in ark #786

Closed
wants to merge 12 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@
<artifactId>sofa-ark-springboot-starter</artifactId>

<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.22</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
Expand Down Expand Up @@ -132,6 +126,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-loader</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.22</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package com.alipay.sofa.ark.springboot;

import com.alipay.sofa.ark.springboot.condition.ConditionalOnArkEnabled;
import com.alipay.sofa.ark.springboot.processor.ArkEventHandlerProcessor;
import com.alipay.sofa.ark.springboot.processor.ArkServiceInjectProcessor;
import com.alipay.sofa.ark.springboot.web.ArkNettyReactiveWebServerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
Expand All @@ -40,16 +38,7 @@
@Configuration
@ConditionalOnArkEnabled
@AutoConfigureBefore(ReactiveWebServerFactoryAutoConfiguration.class)
public class ArkReactiveAutoConfigure {
@Bean
public static ArkServiceInjectProcessor serviceInjectProcessor() {
return new ArkServiceInjectProcessor();
}

@Bean
public static ArkEventHandlerProcessor arkEventHandlerProcessor() {
return new ArkEventHandlerProcessor();
}
public class ArkReactiveAutoConfiguration {

@Configuration
@ConditionalOnMissingBean({ ReactiveWebServerFactory.class })
Expand All @@ -61,7 +50,9 @@ static class EmbeddedNetty {
@Bean
@ConditionalOnMissingBean
ReactorResourceFactory reactorServerResourceFactory() {
return new ReactorResourceFactory();
ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
reactorResourceFactory.setUseGlobalResources(false);
return reactorResourceFactory;
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package com.alipay.sofa.ark.springboot;

import com.alipay.sofa.ark.springboot.condition.ConditionalOnArkEnabled;
import com.alipay.sofa.ark.springboot.processor.ArkEventHandlerProcessor;
import com.alipay.sofa.ark.springboot.processor.ArkServiceInjectProcessor;
import com.alipay.sofa.ark.springboot.web.ArkTomcatServletWebServerFactory;
import org.apache.catalina.startup.Tomcat;
import org.apache.coyote.UpgradeProtocol;
Expand All @@ -42,7 +40,7 @@
@ConditionalOnArkEnabled
@ConditionalOnClass(ServletWebServerFactoryAutoConfiguration.class)
@AutoConfigureBefore(ServletWebServerFactoryAutoConfiguration.class)
public class ArkAutoConfiguration {
public class ArkServletAutoConfiguration {

@Configuration
@ConditionalOnClass(value = { Servlet.class, Tomcat.class, UpgradeProtocol.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.ark.springboot.web;

import com.alipay.sofa.ark.exception.ArkRuntimeException;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author: yuanyuan
*/
public class ArkCompositeReactorHttpHandlerAdapter extends ReactorHttpHandlerAdapter {

private Map<String, ReactorHttpHandlerAdapter> bizReactorHttpHandlerAdapters = new ConcurrentHashMap<>();

public ArkCompositeReactorHttpHandlerAdapter(HttpHandler httpHandler) {
super(httpHandler);
}

@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
String uri = reactorRequest.uri();
for (Map.Entry<String, ReactorHttpHandlerAdapter> entry : bizReactorHttpHandlerAdapters
.entrySet()) {
if (uri.startsWith(entry.getKey())) {
ReactorHttpHandlerAdapter adapter = entry.getValue();
return adapter.apply(reactorRequest, reactorResponse);
}
}
return super.apply(reactorRequest, reactorResponse);
}

public void registerBizReactorHttpHandlerAdapter(String contextPath,
ReactorHttpHandlerAdapter reactorHttpHandlerAdapter) {
ReactorHttpHandlerAdapter old = bizReactorHttpHandlerAdapters.putIfAbsent(contextPath,
reactorHttpHandlerAdapter);
if (old != null) {
throw new ArkRuntimeException("Duplicated context path");
}
}

public void unregisterBizReactorHttpHandlerAdapter(String contextPath) {
bizReactorHttpHandlerAdapters.remove(contextPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,51 +39,62 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.alipay.sofa.ark.spi.constant.Constants.ROOT_WEB_CONTEXT_PATH;

public class ArkNettyReactiveWebServerFactory extends NettyReactiveWebServerFactory {
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private Duration lifecycleTimeout;
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private Duration lifecycleTimeout;

private List<NettyRouteProvider> routeProviders = new ArrayList();
private List<NettyRouteProvider> routeProviders = new ArrayList();
@ArkInject
private EmbeddedServerService embeddedNettyService;
private EmbeddedServerService embeddedNettyService;

@ArkInject
private BizManagerService bizManagerService;
private BizManagerService bizManagerService;

private boolean useForwardHeaders;
private boolean useForwardHeaders;

private ReactorResourceFactory resourceFactory;
private ReactorResourceFactory resourceFactory;

private int backgroundProcessorDelay;
private Set<NettyServerCustomizer> serverCustomizers = new LinkedHashSet();
private int backgroundProcessorDelay;
private Set<NettyServerCustomizer> serverCustomizers = new LinkedHashSet();

public ArkNettyReactiveWebServerFactory() {
}
private static ArkCompositeReactorHttpHandlerAdapter adapter;

public ArkNettyReactiveWebServerFactory(int port) {
super(port);
public ArkNettyReactiveWebServerFactory() {
}

@Override
public WebServer getWebServer(HttpHandler httpHandler) {
if (embeddedNettyService == null) {
return super.getWebServer(httpHandler);
} else if (embeddedNettyService.getEmbedServer() == null) {
embeddedNettyService.setEmbedServer(initEmbedNetty());
}

String contextPath = getContextPath();
Map<String, HttpHandler> handlerMap = new HashMap<>();
handlerMap.put(contextPath, httpHandler);
ContextPathCompositeHandler contextHandler = new ContextPathCompositeHandler(handlerMap);

if (embeddedNettyService == null) {
return super.getWebServer(contextHandler);
} else if (embeddedNettyService.getEmbedServer() == null) {
embeddedNettyService.setEmbedServer(initEmbedNetty());
if (adapter == null) {
adapter = new ArkCompositeReactorHttpHandlerAdapter(contextHandler);
} else {
adapter.registerBizReactorHttpHandlerAdapter(contextPath,
new ReactorHttpHandlerAdapter(contextHandler));
}

HttpServer httpServer = (HttpServer) embeddedNettyService.getEmbedServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(contextHandler);
ArkNettyWebServer webServer = (ArkNettyWebServer) createNettyWebServer(httpServer,
handlerAdapter, lifecycleTimeout);
ArkNettyWebServer webServer = (ArkNettyWebServer) createNettyWebServer(contextPath,
httpServer, adapter, lifecycleTimeout);
webServer.setRouteProviders(this.routeProviders);

return webServer;
Expand All @@ -103,15 +114,20 @@ public String getContextPath() {
if (StringUtils.isEmpty(biz.getWebContextPath())) {
return ROOT_WEB_CONTEXT_PATH;
}
return biz.getWebContextPath();
contextPath = biz.getWebContextPath();
if (!contextPath.startsWith("/")) {
contextPath = "/" + contextPath;
}
return contextPath;
} else {
return ROOT_WEB_CONTEXT_PATH;
}
}

WebServer createNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
WebServer createNettyWebServer(String contextPath, HttpServer httpServer,
ReactorHttpHandlerAdapter handlerAdapter,
Duration lifecycleTimeout) {
return new ArkNettyWebServer(httpServer, handlerAdapter, lifecycleTimeout);
return new ArkNettyWebServer(contextPath, httpServer, handlerAdapter, lifecycleTimeout);
}

private HttpServer initEmbedNetty(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,26 @@ public class ArkNettyWebServer implements WebServer {
private static final Predicate<HttpServerRequest> ALWAYS = (request) -> {
return true;
};
private HttpServer arkHttpServer;
private static HttpServer arkHttpServer;
private static final Log logger = LogFactory.getLog(ArkNettyWebServer.class);
private final HttpServer httpServer;
private final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler;
private final Duration lifecycleTimeout;
private List<NettyRouteProvider> routeProviders = Collections.emptyList();
private volatile DisposableServer disposableServer;
private static volatile DisposableServer disposableServer;
private Thread awaitThread;
private String contextPath;

public ArkNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout) {
public ArkNettyWebServer(String contextPath, HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout) {
Assert.notNull(httpServer, "HttpServer must not be null");
Assert.notNull(handlerAdapter, "HandlerAdapter must not be null");
this.contextPath = contextPath;
this.lifecycleTimeout = lifecycleTimeout;
this.handler = handlerAdapter;
this.httpServer = httpServer.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()));

}

public ArkNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout, HttpServer arkhttpServer){
this(httpServer, handlerAdapter, lifecycleTimeout);
this.arkHttpServer = arkHttpServer;

if (arkHttpServer == null) {
arkHttpServer = this.httpServer;
}
}

public void setRouteProviders(List<NettyRouteProvider> routeProviders) {
Expand All @@ -77,9 +75,9 @@ public void setRouteProviders(List<NettyRouteProvider> routeProviders) {

@Override
public void start() throws WebServerException {
if (this.disposableServer == null) {
if (disposableServer == null) {
try {
this.disposableServer = this.startHttpServer();
disposableServer = this.startHttpServer();
} catch (Exception var2) {
PortInUseException.ifCausedBy(var2, ChannelBindException.class, (bindException) -> {
if (bindException.localPort() > 0 && !this.isPermissionDenied(bindException.getCause())) {
Expand All @@ -89,36 +87,46 @@ public void start() throws WebServerException {
throw new WebServerException("Unable to start Netty", var2);
}

if (this.disposableServer != null) {
logger.info("Netty started" + this.getStartedOnMessage(this.disposableServer));
}
this.startDaemonAwaitThread(disposableServer);
}

this.startDaemonAwaitThread(this.disposableServer);
if (disposableServer != null) {
logger.info("Netty started" + this.getStartedOnMessage(disposableServer) + " with context path " + contextPath);
}
}

@Override
public void stop() throws WebServerException {
if (this.disposableServer != null && arkHttpServer != httpServer) {
if (!(this.handler instanceof ArkCompositeReactorHttpHandlerAdapter)) {
return;
}

((ArkCompositeReactorHttpHandlerAdapter) this.handler).unregisterBizReactorHttpHandlerAdapter(contextPath);

if (disposableServer != null && this.httpServer == arkHttpServer) {
try {
if (this.lifecycleTimeout != null) {
this.disposableServer.disposeNow(this.lifecycleTimeout);
disposableServer.disposeNow(this.lifecycleTimeout);
} else {
this.disposableServer.disposeNow();
disposableServer.disposeNow();
}
awaitThread.stop();
} catch (IllegalStateException var2) {
} catch (IllegalStateException ignore) {

}

this.disposableServer = null;
logger.info("Netty stoped" + this.getStartedOnMessage(disposableServer));

disposableServer = null;
}

}

@Override
public int getPort() {
if (this.disposableServer != null) {
if (disposableServer != null) {
try {
return this.disposableServer.port();
return disposableServer.port();
} catch (UnsupportedOperationException var2) {
return -1;
}
Expand Down Expand Up @@ -171,8 +179,6 @@ private boolean isPermissionDenied(Throwable bindExceptionCause) {
return false;
}



private void applyRouteProviders(HttpServerRoutes routes) {
NettyRouteProvider provider;
for(Iterator var2 = this.routeProviders.iterator(); var2.hasNext(); routes = (HttpServerRoutes)provider.apply(routes)) {
Expand All @@ -183,7 +189,7 @@ private void applyRouteProviders(HttpServerRoutes routes) {
}

private void startDaemonAwaitThread(DisposableServer disposableServer) {
awaitThread = new Thread("server") {
awaitThread = new Thread("server") {
public void run() {
disposableServer.onDispose().block();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ com.alipay.sofa.ark.springboot.listener.ArkApplicationStartListener,\
com.alipay.sofa.ark.springboot.listener.ArkDeployStaticBizListener,\
com.alipay.sofa.ark.springboot.listener.PropertiesResetListener
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alipay.sofa.ark.springboot.ArkAutoConfiguration,\
com.alipay.sofa.ark.springboot.ArkServletAutoConfiguration,\
com.alipay.sofa.ark.springboot.ArkReactiveAutoConfiguration,\
com.alipay.sofa.ark.springboot.ArkAutoProcessorConfiguration
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
com.alipay.sofa.ark.springboot.ArkAutoConfiguration
com.alipay.sofa.ark.springboot.ArkServletAutoConfiguration
com.alipay.sofa.ark.springboot.ArkReactiveAutoConfiguration
com.alipay.sofa.ark.springboot.ArkAutoProcessorConfiguration
Loading
Loading