Skip to content

Commit

Permalink
[Improve][Zeta] Jetty server enable dynamic port (#7903)
Browse files Browse the repository at this point in the history
  • Loading branch information
corgy-w authored Oct 27, 2024
1 parent d1582af commit 23cc78b
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 6 deletions.
1 change: 1 addition & 0 deletions config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ seatunnel:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
7 changes: 5 additions & 2 deletions docs/en/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ completed jobs. The monitoring API is a RESTful API that accepts HTTP requests a
## Overview

The v2 version of the api uses jetty support. It is the same as the interface specification of v1 version
, you can specify the port and context-path by modifying the configuration items in `seatunnel.yaml`

, you can specify the port and context-path by modifying the configuration items in `seatunnel.yaml`,
you can configure `enable-dynamic-port` to enable dynamic ports (the default port is accumulated starting from `port`), and the default is closed,
If enable-dynamic-port is true, We will use the unused port in the range within the range of `port` and `port` + `port-range`, default range is 100

```yaml

Expand All @@ -20,6 +21,8 @@ seatunnel:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
port-range: 100
```
Context-path can also be configured as follows:
Expand Down
7 changes: 6 additions & 1 deletion docs/zh/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ SeaTunnel有一个用于监控的API,可用于查询运行作业的状态和

## 概述

v2版本的api使用jetty支持,与v1版本的接口规范相同 ,可以通过修改`seatunnel.yaml`中的配置项来指定端口和context-path
v2版本的api使用jetty支持,与v1版本的接口规范相同 ,可以通过修改`seatunnel.yaml`中的配置项来指定端口和context-path,
同时可以配置 `enable-dynamic-port` 开启动态端口(默认从 `port` 开始累加),默认为关闭,
如果`enable-dynamic-port``true`,我们将使用`port``port`+`port-range`范围内未使用的端口,默认范围是100。

```yaml

seatunnel:
engine:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
port-range: 100
```
同时也可以配置context-path,配置如下:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ void beforeClass() throws Exception {
node2Tags.setAttribute("node", "node2");
Config node2hzconfig = node1Config.getHazelcastConfig().setMemberAttributeConfig(node2Tags);
node2Config = ConfigProvider.locateAndGetSeaTunnelConfig();
node2Config.getEngineConfig().getHttpConfig().setPort(8081);
// Dynamically generated port
node2Config.getEngineConfig().getHttpConfig().setEnableDynamicPort(true);
node2Config.getEngineConfig().getHttpConfig().setEnabled(true);
node2Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
node2Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ seatunnel:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ private HttpConfig parseHttpConfig(Node httpNode) {
httpConfig.setContextPath(getTextContent(node));
} else if (ServerConfigOptions.ENABLE_HTTP.key().equals(name)) {
httpConfig.setEnabled(getBooleanValue(getTextContent(node)));
} else if (ServerConfigOptions.ENABLE_DYNAMIC_PORT.key().equals(name)) {
httpConfig.setEnableDynamicPort(getBooleanValue(getTextContent(node)));
} else if (ServerConfigOptions.PORT_RANGE.key().equals(name)) {
httpConfig.setPortRange(
getIntegerValue(
ServerConfigOptions.PORT_RANGE.key(), getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public class HttpConfig implements Serializable {

private String contextPath = ServerConfigOptions.CONTEXT_PATH.defaultValue();

private boolean enableDynamicPort = ServerConfigOptions.ENABLE_DYNAMIC_PORT.defaultValue();

private int portRange = ServerConfigOptions.PORT_RANGE.defaultValue();

public void setPort(int port) {
checkPositive(port, ServerConfigOptions.HTTP + " must be > 0");
this.port = port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,20 @@ public class ServerConfigOptions {
.defaultValue("")
.withDescription("The context path of the http server.");

public static final Option<Boolean> ENABLE_DYNAMIC_PORT =
Options.key("enable-dynamic-port")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable the dynamic port of the http server. If true, We will use the unused port");

public static final Option<Integer> PORT_RANGE =
Options.key("port-range")
.intType()
.defaultValue(100)
.withDescription(
"The port range of the http server. If enable-dynamic-port is true, We will use the unused port in the range");

public static final Option<HttpConfig> HTTP =
Options.key("http")
.type(new TypeReference<HttpConfig>() {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ public void testSeaTunnelConfig() {
.getStorage()
.getStoragePluginConfig()
.get("fs.defaultFS"));

Assertions.assertFalse(
config.getEngineConfig().getTelemetryConfig().getMetric().isEnabled());
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnabled());
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnableDynamicPort());
Assertions.assertEquals(8080, config.getEngineConfig().getHttpConfig().getPort());
Assertions.assertEquals(200, config.getEngineConfig().getHttpConfig().getPortRange());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ seatunnel:
namespace: /tmp/seatunnel/checkpoint_snapshot
storage.type: hdfs
fs.defaultFS: file:/// # Ensure that the directory has written permission

telemetry:
metric:
enabled: false
http:
enable-http: true
port: 8080
enable-dynamic-port: true
port-range: 200
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@

import javax.servlet.DispatcherType;

import java.io.IOException;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.URL;
import java.util.EnumSet;

Expand All @@ -71,14 +74,22 @@
/** The Jetty service for SeaTunnel engine server. */
@Slf4j
public class JettyService {

private NodeEngineImpl nodeEngine;
private SeaTunnelConfig seaTunnelConfig;
Server server;

public JettyService(NodeEngineImpl nodeEngine, SeaTunnelConfig seaTunnelConfig) {
this.nodeEngine = nodeEngine;
this.seaTunnelConfig = seaTunnelConfig;
this.server = new Server(seaTunnelConfig.getEngineConfig().getHttpConfig().getPort());
int port = seaTunnelConfig.getEngineConfig().getHttpConfig().getPort();
if (seaTunnelConfig.getEngineConfig().getHttpConfig().isEnableDynamicPort()) {
port =
chooseAppropriatePort(
port, seaTunnelConfig.getEngineConfig().getHttpConfig().getPortRange());
}
log.info("SeaTunnel REST service will start on port {}", port);
this.server = new Server(port);
}

public void createJettyServer() {
Expand Down Expand Up @@ -166,4 +177,26 @@ public void shutdownJettyServer() {
private static String convertUrlToPath(String url) {
return url + "/*";
}

public int chooseAppropriatePort(int initialPort, int portRange) {
int port = initialPort;

while (port <= initialPort + portRange) {
if (!isPortInUse(port)) {
return port;
}
port++;
}

throw new RuntimeException("Jetty failed to start, No available port found in the range!");
}

private boolean isPortInUse(int port) {
try (ServerSocket ss = new ServerSocket(port);
DatagramSocket ds = new DatagramSocket(port)) {
return false;
} catch (IOException e) {
return true;
}
}
}

0 comments on commit 23cc78b

Please sign in to comment.