diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 9c2e5ee0d4..aa8d7bafb3 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -21,6 +21,12 @@ eventMesh.server.provide.protocols=HTTP,TCP,GRPC eventMesh.server.cluster=COMMON eventMesh.server.name=EVENTMESH-runtime eventMesh.sysid=0000 +eventMesh.server.protocol.unified.port=10000 +eventMesh.server.protocol.http.enabled=true +eventMesh.server.protocol.grpc.enabled=true +eventMesh.server.protocol.tcp.enabled=true + +# Legacy port configurations - to be deprecated eventMesh.server.tcp.port=10000 eventMesh.server.http.port=10105 eventMesh.server.grpc.port=10205 diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 3fcca51832..3266cdcb6a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -48,6 +48,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -175,9 +177,10 @@ public void start() throws Exception { .childHandler(new HttpsServerInitializer(useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null)) .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); - log.info("HTTPServer[port={}] started.", this.getPort()); + int port = eventMeshHttpConfiguration.getProtocolConfiguration().getUnifiedPort(); + log.info("HTTPServer[port={}] started.", port); - bootstrap.bind(this.getPort()) + bootstrap.bind(port) .channel() .closeFuture() .sync(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java index c9a464bf8a..47fa185fc0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java @@ -17,6 +17,26 @@ package org.apache.eventmesh.runtime.boot; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.handler.traffic.ChannelTrafficShapingHandler; +import io.netty.handler.traffic.GlobalTrafficShapingHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.SimpleChannelInboundHandler; +import io.opentelemetry.api.trace.Span; + import org.apache.eventmesh.common.Pair; import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.common.protocol.tcp.Command; @@ -52,26 +72,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.AdaptiveRecvByteBufAllocator; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.handler.traffic.ChannelTrafficShapingHandler; -import io.netty.handler.traffic.GlobalTrafficShapingHandler; -import io.opentelemetry.api.trace.Span; - import lombok.extern.slf4j.Slf4j; /** @@ -143,7 +143,7 @@ public void start() throws Exception { .childHandler(new TcpServerInitializer()); try { - int port = eventMeshTCPConfiguration.getEventMeshTcpServerPort(); + int port = eventMeshTCPConfiguration.getProtocolConfiguration().getUnifiedPort(); ChannelFuture f = bootstrap.bind(port).sync(); log.info("EventMeshTCPServer[port={}] started.....", port); f.channel().closeFuture().sync(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java index 17165012d8..b10c2659d7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java @@ -116,7 +116,7 @@ public void init() throws Exception { grpcRetryer = new GrpcRetryer(this); - int serverPort = eventMeshGrpcConfiguration.getGrpcServerPort(); + int serverPort = eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort(); server = ServerBuilder.forPort(serverPort) .addService(new ConsumerService(this, sendMsgExecutor, replyMsgExecutor)) @@ -175,7 +175,7 @@ public boolean register() { boolean registerResult = false; try { String endPoints = IPUtils.getLocalAddress() - + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getGrpcServerPort(); + + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort(); EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo(); eventMeshRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.getEventMeshCluster()); eventMeshRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.getEventMeshName() + "-" @@ -192,7 +192,7 @@ public boolean register() { private void unRegister() throws Exception { String endPoints = IPUtils.getLocalAddress() - + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getGrpcServerPort(); + + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort(); EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo(); eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.getEventMeshCluster()); eventMeshUnRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.getEventMeshName()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java index 924a07ab01..0c707bbf1f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java @@ -33,8 +33,25 @@ public class EventMeshGrpcConfiguration extends CommonConfiguration { @ConfigField(field = "grpc.port", notNull = true, beNumber = true) + @Deprecated private int grpcServerPort = 10205; + @ConfigField(field = "protocol") + private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration(); + + public int getGrpcServerPort() { + return protocolConfiguration.getGrpcPort(); + } + + public void setGrpcServerPort(int port) { + this.grpcServerPort = port; + this.protocolConfiguration.setGrpcPort(port); + } + + public boolean isGrpcEnabled() { + return protocolConfiguration.isGrpcEnabled(); + } + @ConfigField(field = "session.expiredInMills") private int eventMeshSessionExpiredInMills = 60000; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index 287c222245..cf92513bd3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -24,11 +24,13 @@ import java.util.Collections; import java.util.List; +import inet.ipaddr.IPAddress; + import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import inet.ipaddr.IPAddress; +import org.apache.eventmesh.runtime.configuration.ProtocolConfiguration; @Data @EqualsAndHashCode(callSuper = true) @@ -37,8 +39,28 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { @ConfigField(field = "http.port", notNull = true, beNumber = true) + @Deprecated private int httpServerPort = 10105; + @ConfigField(field = "protocol") + private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration(); + + public int getHttpServerPort() { + return protocolConfiguration.getHttpPort(); + } + + public void setHttpServerPort(int port) { + this.httpServerPort = port; + this.protocolConfiguration.setHttpPort(port); + } + + public boolean isHttpEnabled() { + return protocolConfiguration.isHttpEnabled(); + } + + @ConfigField(field = "http.path") + private String eventMeshServerHttpPath = "/eventmesh"; + @ConfigField(field = "batchmsg.batch.enabled") private boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java index 907d80f686..0cb69b19b8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java @@ -33,8 +33,25 @@ public class EventMeshTCPConfiguration extends CommonConfiguration { @ConfigField(field = "tcp.port") + @Deprecated private int eventMeshTcpServerPort = 10000; + @ConfigField(field = "protocol") + private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration(); + + public int getEventMeshTcpServerPort() { + return protocolConfiguration.getTcpPort(); + } + + public void setEventMeshTcpServerPort(int port) { + this.eventMeshTcpServerPort = port; + this.protocolConfiguration.setTcpPort(port); + } + + public boolean isTcpEnabled() { + return protocolConfiguration.isTcpEnabled(); + } + @ConfigField(field = "tcp.allIdleSeconds") private int eventMeshTcpIdleAllSeconds = 60; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/ProtocolConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/ProtocolConfiguration.java new file mode 100644 index 0000000000..d1ad61a819 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/ProtocolConfiguration.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.configuration; + +import org.apache.eventmesh.common.config.Config; +import org.apache.eventmesh.common.config.ConfigField; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@Config(prefix = "eventMesh.server.protocol") +public class ProtocolConfiguration { + + @ConfigField(field = "unified.port", notNull = true, beNumber = true) + private int unifiedPort = 10000; + + @ConfigField(field = "http.enabled") + private boolean httpEnabled = true; + + @ConfigField(field = "grpc.enabled") + private boolean grpcEnabled = true; + + @ConfigField(field = "tcp.enabled") + private boolean tcpEnabled = true; + + public int getHttpPort() { + return unifiedPort; + } + + public int getGrpcPort() { + return unifiedPort; + } + + public int getTcpPort() { + return unifiedPort; + } + + public void setHttpPort(int port) { + this.unifiedPort = port; + } + + public void setGrpcPort(int port) { + this.unifiedPort = port; + } + + public void setTcpPort(int port) { + this.unifiedPort = port; + } +}