diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java index 792307e6ae..80d67f2281 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java @@ -97,8 +97,8 @@ private void initConnection() throws Exception { new AirflowConnectionGetter(airflowConfig.getConnectionId())); if (!response.isSuccess()) { AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "", - airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, - airflowConfig.getPort(), airflowConfig.getInlongPassword(), ""); + airflowConfig.getInlongManagerHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, + airflowConfig.getInlongManagerPort(), airflowConfig.getInlongPassword(), ""); response = serverClient.sendRequest(new AirflowConnectionCreator(newConn)); LOGGER.info("AirflowConnection registration response: {}", response.toString()); if (!response.isSuccess()) { diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java index 489712abe9..9e7ffbf9d0 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java @@ -27,10 +27,17 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import okhttp3.OkHttpClient; +import org.eclipse.jetty.util.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import javax.annotation.PostConstruct; + +import java.net.URL; + @Data @Configuration @NoArgsConstructor @@ -38,11 +45,12 @@ @EqualsAndHashCode(callSuper = true) public class AirflowConfig extends ClientConfiguration { - @Value("${schedule.engine.inlong.manager.host:127.0.0.1}") - private String host; + private static final Logger LOGGER = LoggerFactory.getLogger(AirflowConfig.class); + @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}") + private String inlongManagerUrl; - @Value("${server.port:8083}") - private int port; + private String inlongManagerHost; + private int inlongManagerPort; @Value("${default.admin.user:admin}") private String inlongUsername; @@ -68,6 +76,23 @@ public class AirflowConfig extends ClientConfiguration { @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}") private String baseUrl; + @PostConstruct + public void init() { + try { + if (StringUtil.isNotBlank(inlongManagerUrl)) { + URL url = new URL(inlongManagerUrl); + this.inlongManagerHost = url.getHost(); + this.inlongManagerPort = url.getPort(); + if (this.inlongManagerPort == -1) { + this.inlongManagerPort = 8083; + } + } + LOGGER.info("Init AirflowConfig success for manager url ={}", this.inlongManagerUrl); + } catch (Exception e) { + LOGGER.error("Init AirflowConfig failed for manager url={}: ", this.inlongManagerUrl, e); + } + } + @Bean public OkHttpClient okHttpClient() { return new OkHttpClient.Builder() @@ -79,6 +104,7 @@ public OkHttpClient okHttpClient() { .retryOnConnectionFailure(true) .build(); } + @Bean public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) { return new AirflowServerClient(okHttpClient, airflowConfig); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java index 5123068eab..c2d6ef0094 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java @@ -56,11 +56,8 @@ public class DolphinScheduleEngine implements ScheduleEngine { private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); - @Value("${schedule.engine.inlong.manager.host:127.0.0.1}") - private String host; - - @Value("${server.port:8083}") - private int port; + @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}") + private String inlongManagerUrl; @Value("${default.admin.user:admin}") private String username; @@ -86,10 +83,10 @@ public void init() { this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; } - public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + public DolphinScheduleEngine(String inlongManagerUrl, String username, String password, + String dolphinUrl, String token) { - this.host = host; - this.port = port; + this.inlongManagerUrl = inlongManagerUrl; this.username = username; this.password = password; this.dolphinUrl = dolphinUrl; @@ -161,8 +158,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) { long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo); processDefCode = dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, - host, port, - username, password, offset, scheduleInfo.getInlongGroupId()); + inlongManagerUrl, username, password, offset, scheduleInfo.getInlongGroupId()); LOGGER.info("Create process definition success, process definition code: {}", processDefCode); if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java index e317478c64..8a7d9cbe2b 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java @@ -92,11 +92,11 @@ public long genTaskCode(String url, String token) { /** * Creates a process definition in DolphinScheduler. */ - public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host, - int port, String username, String password, long offset, String groupId) { + public long createProcessDef(String url, String token, String name, String desc, long taskCode, + String inlongManagerUrl, String username, String password, long offset, String groupId) { try { - return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host, - port, username, password, offset, groupId); + return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, inlongManagerUrl, username, + password, offset, groupId); } catch (Exception e) { LOGGER.error("Unexpected wrong in creating process definition: ", e); throw new DolphinScheduleException(UNEXPECTED_ERROR, diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java index 5fd6dd3629..ee28c6973f 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -282,22 +282,21 @@ public static long genTaskCode(String url, String token) { /** * Creates a process definition in DolphinScheduler. * - * @param url The base URL of the DolphinScheduler API. - * @param token The authentication token to be used in the request header. - * @param name The name of the process definition. - * @param desc The description of the process definition. - * @param taskCode The task code to be associated with this process definition. - * @param host The host where the process will run. - * @param port The port where the process will run. - * @param username The username for authentication. - * @param password The password for authentication. - * @param offset The offset for the scheduling. - * @param groupId The group ID of the process. + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param name The name of the process definition. + * @param desc The description of the process definition. + * @param taskCode The task code to be associated with this process definition. + * @param inlongManagerUrl The host where the process will run. + * @param username The username for authentication. + * @param password The password for authentication. + * @param offset The offset for the scheduling. + * @param groupId The group ID of the process. * @return The process definition code (ID) if creation is successful, or 0 if an error occurs. */ public static long createProcessDef(String url, String token, String name, String desc, - long taskCode, String host, - int port, String username, String password, long offset, String groupId) throws Exception { + long taskCode, String inlongManagerUrl, String username, String password, long offset, String groupId) + throws Exception { try { Map header = buildHeader(token); @@ -306,7 +305,7 @@ public static long createProcessDef(String url, String token, String name, Strin String taskRelationJson = MAPPER.writeValueAsString(Collections.singletonList(taskRelation)); DSTaskParams taskParams = new DSTaskParams(); - taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId)); + taskParams.setRawScript(buildScript(inlongManagerUrl, username, password, offset, groupId)); DSTaskDefinition taskDefinition = new DSTaskDefinition(); taskDefinition.setCode(taskCode); @@ -774,10 +773,10 @@ private static JsonObject executeHttpRequest(String url, String method, Map