From 042e56862440422ca0e6f045996908ed6a54a8ba Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:27:58 +0800 Subject: [PATCH] [INLONG-11401][Manager] Support Dolphinscheduler schedule engine (#11468) --- .../dolphinschedule/DSTaskDefinition.java | 120 +++ .../dolphinschedule/DSTaskParams.java | 46 + .../dolphinschedule/DSTaskRelation.java | 59 ++ .../dolphinschedule/DScheduleInfo.java | 43 + inlong-manager/manager-schedule/pom.xml | 30 + .../manager/schedule/ScheduleEngineType.java | 3 +- .../DolphinScheduleClient.java | 58 ++ .../DolphinScheduleConstants.java | 76 ++ .../DolphinScheduleEngine.java | 268 ++++++ .../DolphinScheduleOperator.java | 173 ++++ .../DolphinScheduleUtils.java | 790 ++++++++++++++++++ .../exception/DolphinScheduleException.java | 105 +++ .../DolphinScheduleContainerTestEnv.java | 170 ++++ .../DolphinScheduleEngineTest.java | 127 +++ ...DolphinSchedulerContainerEnvConstants.java | 51 ++ .../main/resources/application-dev.properties | 10 +- .../resources/application-prod.properties | 11 +- .../resources/application-test.properties | 11 +- 18 files changed, 2136 insertions(+), 15 deletions(-) create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java create mode 100644 inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java create mode 100644 inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java create mode 100644 inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java create mode 100644 inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java create mode 100644 inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java create mode 100644 inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java create mode 100644 inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java create mode 100644 inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java create mode 100644 inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java new file mode 100644 index 00000000000..700638b63c9 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class DSTaskDefinition { + + @ApiModelProperty("DolphinScheduler task definition code") + @JsonProperty("code") + private long code; + + @ApiModelProperty("DolphinScheduler task definition code") + @JsonProperty("delayTime") + private String delayTime; + + @ApiModelProperty("DolphinScheduler task definition description") + @JsonProperty("description") + private String description; + + @ApiModelProperty("DolphinScheduler task definition environment code") + @JsonProperty("environmentCode") + private int environmentCode; + + @ApiModelProperty("DolphinScheduler task fail retry interval") + @JsonProperty("failRetryInterval") + private String failRetryInterval; + + @ApiModelProperty("DolphinScheduler task definition fail retry times") + @JsonProperty("failRetryTimes") + private String failRetryTimes; + + @ApiModelProperty("DolphinScheduler task definition flag") + @JsonProperty("flag") + private String flag; + + @ApiModelProperty("DolphinScheduler task definition isCache") + @JsonProperty("isCache") + private String isCache; + + @ApiModelProperty("DolphinScheduler task definition name") + @JsonProperty("name") + private String name; + + @ApiModelProperty("DolphinScheduler task definition params") + @JsonProperty("taskParams") + private DSTaskParams taskParams; + + @ApiModelProperty("DolphinScheduler task definition priority") + @JsonProperty("taskPriority") + private String taskPriority; + + @ApiModelProperty("DolphinScheduler task definition type") + @JsonProperty("taskType") + private String taskType; + + @ApiModelProperty("DolphinScheduler task definition timeout") + @JsonProperty("timeout") + private int timeout; + + @ApiModelProperty("DolphinScheduler task definition timeout flag") + @JsonProperty("timeoutFlag") + private String timeoutFlag; + + @ApiModelProperty("DolphinScheduler task definition timeout notify strategy") + @JsonProperty("timeoutNotifyStrategy") + private String timeoutNotifyStrategy; + + @ApiModelProperty("DolphinScheduler task definition worker group") + @JsonProperty("workerGroup") + private String workerGroup; + + @ApiModelProperty("DolphinScheduler task definition apu quota") + @JsonProperty("cpuQuota") + private int cpuQuota; + + @ApiModelProperty("DolphinScheduler task definition memory max") + @JsonProperty("memoryMax") + private int memoryMax; + + @ApiModelProperty("DolphinScheduler task definition execute type") + @JsonProperty("taskExecuteType") + private String taskExecuteType; + + public DSTaskDefinition() { + this.delayTime = "0"; + this.description = ""; + this.environmentCode = -1; + this.failRetryInterval = "1"; + this.failRetryTimes = "0"; + this.flag = "YES"; + this.isCache = "NO"; + this.taskPriority = "MEDIUM"; + this.taskType = "SHELL"; + this.timeoutFlag = "CLOSE"; + this.timeoutNotifyStrategy = ""; + this.workerGroup = "default"; + this.cpuQuota = -1; + this.memoryMax = -1; + this.taskExecuteType = "BATCH"; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java new file mode 100644 index 00000000000..a5344f5facf --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; +@Data +public class DSTaskParams { + + @ApiModelProperty("DolphinScheduler task params local params") + @JsonProperty("localParams") + private List localParams; + + @ApiModelProperty("DolphinScheduler task params raw script") + @JsonProperty("rawScript") + private String rawScript; + + @ApiModelProperty("DolphinScheduler task params resource list") + @JsonProperty("resourceList") + private List resourceList; + + public DSTaskParams() { + this.localParams = new ArrayList<>(); + this.resourceList = new ArrayList<>(); + this.rawScript = ""; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java new file mode 100644 index 00000000000..e853317df77 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class DSTaskRelation { + + @ApiModelProperty("DolphinScheduler task relation name") + @JsonProperty("name") + private String name; + + @ApiModelProperty("DolphinScheduler task relation pre-task code") + @JsonProperty("preTaskCode") + private int preTaskCode; + + @ApiModelProperty("DolphinScheduler task relation pre-task version") + @JsonProperty("preTaskVersion") + private int preTaskVersion; + + @ApiModelProperty("DolphinScheduler task relation post-task code") + @JsonProperty("postTaskCode") + private long postTaskCode; + + @ApiModelProperty("DolphinScheduler task relation post-task version") + @JsonProperty("postTaskVersion") + private int postTaskVersion; + + @ApiModelProperty("DolphinScheduler task relation condition type") + @JsonProperty("conditionType") + private String conditionType; + + @ApiModelProperty("DolphinScheduler task relation condition params") + @JsonProperty("conditionParams") + private Object conditionParams; + + public DSTaskRelation() { + this.name = ""; + this.conditionType = "NONE"; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java new file mode 100644 index 00000000000..ac45b26f197 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class DScheduleInfo { + + @ApiModelProperty("DolphinScheduler schedule start time") + @JsonProperty("startTime") + private String startTime; + + @ApiModelProperty("DolphinScheduler schedule end time") + @JsonProperty("endTime") + private String endTime; + + @ApiModelProperty("DolphinScheduler schedule crontab expression") + @JsonProperty("crontab") + private String crontab; + + @ApiModelProperty("DolphinScheduler schedule timezone id") + @JsonProperty("timezoneId") + private String timezoneId; + +} diff --git a/inlong-manager/manager-schedule/pom.xml b/inlong-manager/manager-schedule/pom.xml index a9d9fb3e1ed..82632a1aff8 100644 --- a/inlong-manager/manager-schedule/pom.xml +++ b/inlong-manager/manager-schedule/pom.xml @@ -73,5 +73,35 @@ junit-jupiter test + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + org.springframework.boot + spring-boot-test + ${spring.boot.version} + test + + + org.springframework.boot + spring-boot-starter-test + + + com.vaadin.external.google + android-json + + + com.jayway.jsonpath + json-path + + + org.junit.jupiter + junit-jupiter-api + + + diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java index 71949ef7445..ac71e4e2d13 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java @@ -23,7 +23,8 @@ public enum ScheduleEngineType { NONE("None"), - QUARTZ("Quartz"); + QUARTZ("Quartz"), + DOLPHINSCHEDULER("DolphinScheduler"); private final String type; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java new file mode 100644 index 00000000000..a75085b9ac9 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.schedule.ScheduleEngineType; + +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * Built-in implementation of third-party schedule engine client corresponding with {@link DolphinScheduleEngine}. + * DolphinScheduleClient simply invokes the {@link DolphinScheduleEngine} to register/unregister/update + * schedule info, all the logic for invoking the remote scheduling service is implemented in {@link DolphinScheduleEngine} + */ +@Service +public class DolphinScheduleClient implements ScheduleEngineClient { + + @Resource + public DolphinScheduleEngine scheduleEngine; + + @Override + public boolean accept(String engineType) { + return ScheduleEngineType.DOLPHINSCHEDULER.getType().equalsIgnoreCase(engineType); + } + + @Override + public boolean register(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleRegister(scheduleInfo); + } + + @Override + public boolean unregister(String groupId) { + return scheduleEngine.handleUnregister(groupId); + } + + @Override + public boolean update(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleUpdate(scheduleInfo); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java new file mode 100644 index 00000000000..89dcda5b779 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +public class DolphinScheduleConstants { + + // DS public constants + public static final String DS_ID = "id"; + public static final String DS_CODE = "code"; + public static final String DS_TOKEN = "token"; + public static final String DS_PAGE_SIZE = "pageSize"; + public static final String DS_PAGE_NO = "pageNo"; + public static final String DS_SEARCH_VAL = "searchVal"; + public static final String DS_RESPONSE_DATA = "data"; + public static final String DS_RESPONSE_NAME = "name"; + public static final String DS_RESPONSE_TOTAL_LIST = "totalList"; + public static final String DS_DEFAULT_PAGE_SIZE = "10"; + public static final String DS_DEFAULT_PAGE_NO = "1"; + public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai"; + + // DS project related constants + public static final String DS_PROJECT_URL = "/projects"; + public static final String DS_PROJECT_NAME = "projectName"; + public static final String DS_PROJECT_DESC = "description"; + public static final String DS_DEFAULT_PROJECT_NAME = "default_inlong_offline_scheduler"; + public static final String DS_DEFAULT_PROJECT_DESC = "default scheduler project for inlong offline job"; + + // DS task related constants + public static final String DS_TASK_CODE_URL = "/task-definition/gen-task-codes"; + public static final String DS_TASK_RELATION = "taskRelationJson"; + public static final String DS_TASK_DEFINITION = "taskDefinitionJson"; + public static final String DS_TASK_GEN_NUM = "genNum"; + public static final String DS_DEFAULT_TASK_GEN_NUM = "1"; + public static final String DS_DEFAULT_TASK_NAME = "default-inlong-http-callback"; + public static final String DS_DEFAULT_TASK_DESC = "default http request using shell script callbacks to inlong"; + + // DS process definition related constants + public static final String DS_PROCESS_URL = "/process-definition"; + public static final String DS_PROCESS_QUERY_URL = "/query-process-definition-list"; + public static final String DS_PROCESS_NAME = "name"; + public static final String DS_PROCESS_DESC = "description"; + public static final String DS_PROCESS_CODE = "processDefinitionCode"; + public static final String DS_DEFAULT_PROCESS_NAME = "_inlong_offline_process_definition"; + public static final String DS_DEFAULT_PROCESS_DESC = "scheduler process definition for inlong group: "; + + // DS release related constants + public static final String DS_RELEASE_URL = "/release"; + public static final String DS_RELEASE_STATE = "releaseState"; + + // DS schedule related constants + public static final String DS_SCHEDULE_URL = "/schedules"; + public static final String DS_SCHEDULE_DEF = "schedule"; + public static final String DS_DEFAULT_SCHEDULE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + // DS online/offline related constants + public static final String DS_ONLINE_URL = "/online"; + public static final String DS_ONLINE_STATE = "ONLINE"; + public static final String DS_OFFLINE_URL = "/offline"; + public static final String DS_OFFLINE_STATE = "OFFLINE"; + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java new file mode 100644 index 00000000000..7b09481ceac --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${schedule.engine.dolphinscheduler.url:http://127.0.0.1:12345/dolphinscheduler}") + private String dolphinUrl; + + @Value("${schedule.engine.dolphinscheduler.token:default_token_value}") + private String token; + + @Resource + private DolphinScheduleOperator dolphinScheduleOperator; + + private long projectCode; + private String projectBaseUrl; + private final Map scheduledProcessMap; + + @PostConstruct + public void init() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + } + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } + + public DolphinScheduleEngine() { + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine, Checking project exists..."); + long code = dolphinScheduleOperator.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME); + if (code != 0) { + LOGGER.info("Project exists, project code: {}", code); + this.projectCode = code; + + LOGGER.info("Starting synchronize existing process definition"); + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + scheduledProcessMap.putAll(dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl, token)); + + } else { + LOGGER.info("There is no inlong offline project exists, default project will be created"); + this.projectCode = + dolphinScheduleOperator.creatProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, + DS_DEFAULT_PROJECT_DESC); + } + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + */ + @Override + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo) { + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; + String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; + String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId(); + + LOGGER.info("Dolphin Scheduler handle register begin for {}, Checking process definition id uniqueness...", + scheduleInfo.getInlongGroupId()); + try { + long processDefCode = dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName); + + boolean online = false; + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + // process definition already exists, delete and rebuild + LOGGER.info("Process definition exists, process definition id: {}, deleting...", processDefCode); + if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + dolphinScheduleOperator.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + } + String taskCodeUrl = projectBaseUrl + "/" + projectCode + DS_TASK_CODE_URL; + + long taskCode = dolphinScheduleOperator.genTaskCode(taskCodeUrl, token); + LOGGER.info("Generate task code for process definition success, task code: {}", taskCode); + + long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo); + processDefCode = + dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, + host, port, + username, password, offset, scheduleInfo.getInlongGroupId()); + LOGGER.info("Create process definition success, process definition code: {}", processDefCode); + + if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { + LOGGER.info("Release process definition success, release status: {}", DS_ONLINE_STATE); + + int scheduleId = dolphinScheduleOperator.createScheduleForProcessDef(scheduleUrl, processDefCode, token, + scheduleInfo); + LOGGER.info("Create schedule for process definition success, schedule info: {}", scheduleInfo); + + online = dolphinScheduleOperator.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token); + LOGGER.info("Online schedule for process definition, status: {}", online); + } + + scheduledProcessMap.putIfAbsent(processDefCode, processName); + return online; + } catch (Exception e) { + LOGGER.error("Failed to handle unregister dolphin scheduler: ", e); + throw new DolphinScheduleException( + String.format("Failed to handle unregister dolphin scheduler: %s", e.getMessage())); + } + } + + /** + * Handle schedule unregister. + * @param groupId group to un-register schedule info + */ + @Override + @VisibleForTesting + public boolean handleUnregister(String groupId) { + String processName = groupId + DS_DEFAULT_PROCESS_NAME; + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + + LOGGER.info("Dolphin Scheduler handle Unregister begin for {}, Checking process definition id uniqueness...", + groupId); + try { + long processDefCode = dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName); + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + LOGGER.info("Deleting process definition, process definition id: {}", processDefCode); + if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + + dolphinScheduleOperator.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + LOGGER.info("Process definition deleted"); + } + } + LOGGER.info("Un-registered dolphin schedule info for {}", groupId); + return !scheduledProcessMap.containsKey(processDefCode); + } catch (Exception e) { + LOGGER.error("Failed to handle unregister dolphin scheduler: ", e); + throw new DolphinScheduleException( + String.format("Failed to handle unregister dolphin scheduler: %s", e.getMessage())); + } + } + + /** + * Handle schedule update. + * @param scheduleInfo schedule info to update + */ + @Override + @VisibleForTesting + public boolean handleUpdate(ScheduleInfo scheduleInfo) { + LOGGER.info("Update dolphin schedule info for {}", scheduleInfo.getInlongGroupId()); + try { + return handleUnregister(scheduleInfo.getInlongGroupId()) && handleRegister(scheduleInfo); + } catch (Exception e) { + LOGGER.error("Failed to handle update dolphin scheduler: ", e); + throw new DolphinScheduleException( + String.format("Failed to handle update dolphin scheduler: %s", e.getMessage())); + } + } + + /** + * stop and delete all process definition in DolphinScheduler + * remove all process stored in scheduledProcessMap + * delete project for inlong-group-id in DolphinScheduler + */ + @Override + public void stop() { + LOGGER.info("Stopping dolphin scheduler engine..."); + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + try { + + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + Map allProcessDef = dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl, token); + + for (Long processDefCode : allProcessDef.keySet()) { + + LOGGER.info("delete process definition id: {}", processDefCode); + dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE); + dolphinScheduleOperator.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + + dolphinScheduleOperator.deleteProject(projectBaseUrl, token, projectCode); + LOGGER.info("Dolphin scheduler engine stopped"); + + } catch (Exception e) { + LOGGER.error("Failed to stop dolphin scheduler: ", e); + throw new DolphinScheduleException(String.format("Failed to stop dolphin scheduler: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java new file mode 100644 index 00000000000..e317478c645 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.Map; + +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR; + +/** + * DolphinScheduler operator, This class includes methods for creating, updating, and deleting projects, + * tasks, and process definitions in DolphinScheduler. + */ +@Service +public class DolphinScheduleOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleOperator.class); + + /** + * Checks the uniqueness of a DolphinScheduler project ID based on the given search value. + */ + public long checkAndGetUniqueId(String url, String token, String searchVal) { + try { + return DolphinScheduleUtils.checkAndGetUniqueId(url, token, searchVal); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in check id uniqueness: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in check id uniqueness: %s", e.getMessage())); + } + } + + /** + * Creates a new project in DolphinScheduler. + */ + public long creatProject(String url, String token, String projectName, String description) { + try { + return DolphinScheduleUtils.creatProject(url, token, projectName, description); + } catch (Exception e) { + LOGGER.error("Unexpected error while creating new project: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected error while creating new project: %s", e.getMessage())); + } + } + + /** + * Query all process definition in DolphinScheduler project. + */ + public Map queryAllProcessDef(String url, String token) { + try { + return DolphinScheduleUtils.queryAllProcessDef(url, token); + } catch (Exception e) { + LOGGER.error("Unexpected error while querying process definition: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected error while querying process definition: %s", e.getMessage())); + } + } + + /** + * Generates a new task code in DolphinScheduler. + */ + public long genTaskCode(String url, String token) { + try { + return DolphinScheduleUtils.genTaskCode(url, token); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in generating task code: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in generating task code: %s", e.getMessage())); + } + } + + /** + * Creates a process definition in DolphinScheduler. + */ + public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host, + int port, String username, String password, long offset, String groupId) { + try { + return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host, + port, username, password, offset, groupId); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in creating process definition: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in creating process definition: %s", e.getMessage())); + } + } + + /** + * Releases a process definition in DolphinScheduler. + */ + public boolean releaseProcessDef(String processDefUrl, long processDefCode, String token, String status) { + try { + return DolphinScheduleUtils.releaseProcessDef(processDefUrl, processDefCode, token, status); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in release process definition: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in release process definition: %s", e.getMessage())); + } + } + + /** + * Create a schedule for process definition in DolphinScheduler. + */ + public int createScheduleForProcessDef(String url, long processDefCode, String token, ScheduleInfo scheduleInfo) { + try { + return DolphinScheduleUtils.createScheduleForProcessDef(url, processDefCode, token, + scheduleInfo); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in creating schedule for process definition: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in creating schedule for process definition: %s", e.getMessage())); + } + } + + /** + * Online the schedule for process definition in DolphinScheduler. + */ + public boolean onlineScheduleForProcessDef(String scheduleUrl, int scheduleId, String token) { + try { + return DolphinScheduleUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in online process definition: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in online process definition: %s", e.getMessage())); + } + } + + /** + * Delete the process definition in DolphinScheduler. + */ + public void deleteProcessDef(String processDefUrl, String token, long processDefCode) { + try { + DolphinScheduleUtils.delete(processDefUrl, token, processDefCode); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in deleting process definition: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in deleting process definition: %s", e.getMessage())); + } + } + + /** + * Delete the project in DolphinScheduler. + */ + public void deleteProject(String projectBaseUrl, String token, long projectCode) { + try { + DolphinScheduleUtils.delete(projectBaseUrl, token, projectCode); + } catch (Exception e) { + LOGGER.error("Unexpected wrong in deleting project definition: ", e); + throw new DolphinScheduleException(UNEXPECTED_ERROR, + String.format("Unexpected wrong in deleting project definition: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java new file mode 100644 index 00000000000..87cb1c51277 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -0,0 +1,790 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.common.bounded.BoundaryType; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskDefinition; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskParams; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskRelation; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleUnit; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.core.util.CronExpression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_SIZE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_CODE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TOKEN; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.DELETION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.GEN_TASK_CODE_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.HTTP_REQUEST_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.INVALID_HTTP_METHOD; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_CREATION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_ONLINE_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNIQUE_CHECK_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNSUPPORTED_SCHEDULE_TYPE; + +/** + * DolphinScheduler utils + * A utility class for interacting with DolphinScheduler API. + */ +public class DolphinScheduleUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + private static final String POST = "POST"; + private static final String GET = "GET"; + private static final String DELETE = "DELETE"; + private static final long MILLIS_IN_SECOND = 1000L; + private static final long MILLIS_IN_MINUTE = 60 * MILLIS_IN_SECOND; + private static final long MILLIS_IN_HOUR = 60 * MILLIS_IN_MINUTE; + private static final long MILLIS_IN_DAY = 24 * MILLIS_IN_HOUR; + private static final long MILLIS_IN_WEEK = 7 * MILLIS_IN_DAY; + private static final long MILLIS_IN_MONTH = 30 * MILLIS_IN_DAY; + private static final long MILLIS_IN_YEAR = 365 * MILLIS_IN_DAY; + private static final String CONTENT_TYPE = "Content-Type: application/json; charset=utf-8"; + private static final String SHELL_REQUEST_API = "/inlong/manager/api/group/submitOfflineJob"; + private static final OkHttpClient CLIENT = new OkHttpClient(); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private DolphinScheduleUtils() { + } + + /** + * Checks the uniqueness of a project ID based on the given search value. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param searchVal The name of the project to search for. + * @return The unique project ID if found, or 0 if not found or an error occurs. + */ + public static long checkAndGetUniqueId(String url, String token, String searchVal) { + try { + Map header = buildHeader(token); + Map queryParams = buildPageParam(searchVal); + + JsonObject response = executeHttpRequest(url, GET, queryParams, header); + + JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject(); + JsonArray totalList = data.getAsJsonArray(DS_RESPONSE_TOTAL_LIST); + + // check uniqueness + if (totalList != null && totalList.size() == 1) { + JsonObject project = totalList.get(0).getAsJsonObject(); + String name = project.get(DS_RESPONSE_NAME).getAsString(); + if (name.equals(searchVal)) { + return project.get(DS_CODE).getAsLong(); + } + } + return 0; + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during checkAndGetUniqueId", e); + throw new DolphinScheduleException(JSON_PARSE_ERROR, + String.format("Error parsing json during unique ID check for: %s at URL: %s", searchVal, url), e); + + } catch (DolphinScheduleException e) { + LOGGER.error("DolphinScheduleException during unique ID check: {}", e.getDetailedMessage(), e); + throw new DolphinScheduleException(UNIQUE_CHECK_FAILED, + String.format("Error checking unique ID for %s at URL: %s", searchVal, url)); + } + } + + /** + * Creates a new project in DolphinScheduler. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param projectName The name of the new project. + * @param description The description of the new project. + * @return The project code (ID) if creation is successful, or 0 if an error occurs. + */ + public static long creatProject(String url, String token, String projectName, + String description) { + try { + Map header = buildHeader(token); + + Map queryParams = new HashMap<>(); + queryParams.put(DS_PROJECT_NAME, projectName); + queryParams.put(DS_PROJECT_DESC, description); + + JsonObject response = executeHttpRequest(url, POST, queryParams, header); + + JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject(); + LOGGER.info("create project success, project data: {}", data); + + return data != null ? data.get(DS_CODE).getAsLong() : 0; + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during creating project", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error creating project with name: %s and description: %s at URL: %s", + projectName, description, url)); + + } catch (DolphinScheduleException e) { + LOGGER.error("Creating project failed: {}", e.getMessage()); + throw new DolphinScheduleException( + PROJECT_CREATION_FAILED, + String.format("Error creating project with name: %s and description: %s at URL: %s", + projectName, description, url)); + } + } + + /** + * Query all process definition in project + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @return Map of all the process definition + */ + public static Map queryAllProcessDef(String url, String token) { + Map header = buildHeader(token); + try { + JsonObject response = executeHttpRequest(url, GET, new HashMap<>(), header); + + Map processDef = + StreamSupport.stream(response.get(DS_RESPONSE_DATA).getAsJsonArray().spliterator(), false) + .map(JsonElement::getAsJsonObject) + .collect(Collectors.toMap( + jsonObject -> jsonObject.get(DS_CODE).getAsLong(), + jsonObject -> jsonObject.get(DS_PROCESS_NAME).getAsString())); + + LOGGER.info("Query all process definition success, processes info: {}", processDef); + return processDef; + + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during query all process definition", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error querying all process definitions at URL: %s", url)); + + } catch (DolphinScheduleException e) { + LOGGER.info("Query all process definition failed: {}", e.getMessage()); + throw new DolphinScheduleException( + PROCESS_DEFINITION_QUERY_FAILED, + String.format("Error querying all process definitions at URL: %s", url)); + } + + } + + /** + * Generates a new task code in DolphinScheduler. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @return The task code (ID) if generation is successful, or 0 if an error occurs. + */ + public static long genTaskCode(String url, String token) { + try { + Map header = buildHeader(token); + + Map queryParams = new HashMap<>(); + queryParams.put(DS_TASK_GEN_NUM, DS_DEFAULT_TASK_GEN_NUM); + + JsonObject response = executeHttpRequest(url, GET, queryParams, header); + + JsonArray data = response.get(DS_RESPONSE_DATA).getAsJsonArray(); + + LOGGER.info("Generate task code success, task code data: {}", data); + return data != null && data.size() == 1 ? data.get(0).getAsLong() : 0; + + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during generate task code", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error generate task code at URL: %s", url)); + + } catch (DolphinScheduleException e) { + LOGGER.info("generate task code failed: {}", e.getMessage()); + throw new DolphinScheduleException( + GEN_TASK_CODE_FAILED, + String.format("Error generate task code at URL: %s", url)); + } + } + + /** + * Creates a process definition in DolphinScheduler. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param name The name of the process definition. + * @param desc The description of the process definition. + * @param taskCode The task code to be associated with this process definition. + * @param host The host where the process will run. + * @param port The port where the process will run. + * @param username The username for authentication. + * @param password The password for authentication. + * @param offset The offset for the scheduling. + * @param groupId The group ID of the process. + * @return The process definition code (ID) if creation is successful, or 0 if an error occurs. + */ + public static long createProcessDef(String url, String token, String name, String desc, + long taskCode, String host, + int port, String username, String password, long offset, String groupId) throws Exception { + try { + Map header = buildHeader(token); + + DSTaskRelation taskRelation = new DSTaskRelation(); + taskRelation.setPostTaskCode(taskCode); + String taskRelationJson = MAPPER.writeValueAsString(Collections.singletonList(taskRelation)); + + DSTaskParams taskParams = new DSTaskParams(); + taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId)); + + DSTaskDefinition taskDefinition = new DSTaskDefinition(); + taskDefinition.setCode(taskCode); + taskDefinition.setName(DS_DEFAULT_TASK_NAME); + taskDefinition.setDescription(DS_DEFAULT_TASK_DESC); + taskDefinition.setTaskParams(taskParams); + String taskDefinitionJson = MAPPER.writeValueAsString(Collections.singletonList(taskDefinition)); + + Map queryParams = new HashMap<>(); + queryParams.put(DS_TASK_RELATION, taskRelationJson); + queryParams.put(DS_TASK_DEFINITION, taskDefinitionJson); + queryParams.put(DS_PROCESS_NAME, name); + queryParams.put(DS_PROCESS_DESC, desc); + + JsonObject data = executeHttpRequest(url, POST, queryParams, header); + + LOGGER.info("create process definition success, process definition data: {}", data); + return data != null ? data.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_CODE).getAsLong() : 0; + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during creating process definition", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error creating process definition with name: %s and description: %s at URL: %s", + name, desc, url)); + + } catch (DolphinScheduleException e) { + throw new DolphinScheduleException( + PROCESS_DEFINITION_CREATION_FAILED, + String.format("Error creating process definition with name: %s and description: %s at URL: %s", + name, desc, url)); + } + } + + /** + * Releases a process definition in DolphinScheduler. + * + * @param processDefUrl The URL to release the process definition. + * @param processDefCode The ID of the process definition. + * @param token The authentication token to be used in the request header. + * @param status The status to set for the process definition (e.g., "online" or "offline"). + * @return true if the process definition was successfully released, false otherwise. + */ + public static boolean releaseProcessDef(String processDefUrl, long processDefCode, + String token, String status) { + try { + String url = processDefUrl + "/" + processDefCode + DS_RELEASE_URL; + Map header = buildHeader(token); + + Map queryParam = new HashMap<>(); + queryParam.put(DS_RELEASE_STATE, status); + + JsonObject response = executeHttpRequest(url, POST, queryParam, header); + LOGGER.info("release process definition success, response data: {}", response); + + return response.get(DS_RESPONSE_DATA).getAsBoolean(); + + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during releasing process definition", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error releasing process definition with code: %d and status: %s at URL: %s", + processDefCode, status, processDefUrl)); + + } catch (DolphinScheduleException e) { + throw new DolphinScheduleException( + PROCESS_DEFINITION_RELEASE_FAILED, + String.format("Error releasing process definition with code: %d and status: %s at URL: %s", + processDefCode, status, processDefUrl)); + } + } + + /** + * Create a schedule for process definition in DolphinScheduler. + * + * @param url The URL to create a schedule for the process definition. + * @param processDefCode The ID of the process definition. + * @param token The authentication token to be used in the request header. + * @param scheduleInfo The schedule info + * @return The schedule id + */ + public static int createScheduleForProcessDef(String url, long processDefCode, + String token, ScheduleInfo scheduleInfo) throws Exception { + + try { + Map header = buildHeader(token); + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DS_DEFAULT_SCHEDULE_TIME_FORMAT); + String startTime = scheduleInfo.getStartTime().toLocalDateTime() + .atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter); + String endTime = scheduleInfo.getEndTime().toLocalDateTime() + .atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter); + + String crontab; + switch (scheduleInfo.getScheduleType()) { + case 0: + crontab = generateCrontabExpression(scheduleInfo.getScheduleUnit(), + scheduleInfo.getScheduleInterval()); + break; + + case 1: + crontab = scheduleInfo.getCrontabExpression(); + break; + + default: + LOGGER.error("Unsupported schedule type: {}", scheduleInfo.getScheduleType()); + throw new DolphinScheduleException("Unsupported schedule type: " + scheduleInfo.getScheduleType()); + } + + DScheduleInfo dScheduleInfo = new DScheduleInfo(); + dScheduleInfo.setStartTime(startTime); + dScheduleInfo.setEndTime(endTime); + dScheduleInfo.setCrontab(crontab); + dScheduleInfo.setTimezoneId(DS_DEFAULT_TIMEZONE_ID); + String scheduleDef = MAPPER.writeValueAsString(dScheduleInfo); + + Map queryParams = new HashMap<>(); + queryParams.put(DS_PROCESS_CODE, String.valueOf(processDefCode)); + queryParams.put(DS_SCHEDULE_DEF, scheduleDef); + + JsonObject response = executeHttpRequest(url, POST, queryParams, header); + LOGGER.info("create schedule for process definition success, response data: {}", response); + + return response.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_ID).getAsInt(); + + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during releasing process definition", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error creating schedule for process definition code: %d at URL: %s", + processDefCode, url)); + + } catch (DolphinScheduleException e) { + throw new DolphinScheduleException( + SCHEDULE_CREATION_FAILED, + String.format("Error creating schedule for process definition code: %d at URL: %s", + processDefCode, url)); + } + } + + /** + * Online the schedule for process definition in DolphinScheduler. + * + * @param scheduleUrl The URL to online the schedule for process definition. + * @param scheduleId The ID of the schedule of process definition. + * @param token The authentication token to be used in the request header. + * @return whether online is succeeded + */ + public static boolean onlineScheduleForProcessDef(String scheduleUrl, int scheduleId, + String token) { + try { + Map header = buildHeader(token); + + String url = scheduleUrl + "/" + scheduleId + DS_ONLINE_URL; + JsonObject response = executeHttpRequest(url, POST, new HashMap<>(), header); + LOGGER.info("online schedule for process definition success, response data: {}", response); + + if (response != null && !response.get(DS_RESPONSE_DATA).isJsonNull()) { + return response.get(DS_RESPONSE_DATA).getAsBoolean(); + } + return false; + + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during online schedule", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error online schedule with ID: %d online at URL: %s", scheduleId, scheduleUrl)); + + } catch (DolphinScheduleException e) { + throw new DolphinScheduleException( + SCHEDULE_ONLINE_FAILED, + String.format("Error online schedule with ID: %d online at URL: %s", scheduleId, scheduleUrl)); + } + } + + /** + * Delete the process definition in DolphinScheduler. + * + * @param url The URL to delete the project or process definition. + * @param token The authentication token to be used in the request header. + * @param code The project code or process definition code + */ + public static void delete(String url, String token, long code) { + try { + Map header = buildHeader(token); + + String requestUrl = url + "/" + code; + + JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header); + LOGGER.info("delete process or project success, response data: {}", response); + + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during deleting process or project", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error deleting process or project with code: %d at URL: %s", code, url), e); + + } catch (DolphinScheduleException e) { + throw new DolphinScheduleException( + DELETION_FAILED, + String.format("Error deleting process or project with code: %d at URL: %s", code, url), e); + } + } + + /** + * Builds the header map for HTTP requests, including the authentication token. + * + * @param token The authentication token for the request. + * @return A map representing the headers of the HTTP request. + */ + private static Map buildHeader(String token) { + Map headers = new HashMap<>(); + if (StringUtils.isNotEmpty(token)) { + headers.put(DS_TOKEN, token); + } + return headers; + } + + /** + * Builds a query parameter map used for API calls that need to paginate or filter results. + * This method can be used for searching projects or tasks. + * + * @param searchVal The value to search for. + * @return A map containing the necessary query parameters. + */ + private static Map buildPageParam(String searchVal) { + Map queryParams = new HashMap<>(); + queryParams.put(DS_SEARCH_VAL, searchVal); + queryParams.put(DS_PAGE_SIZE, DS_DEFAULT_PAGE_SIZE); + queryParams.put(DS_PAGE_NO, DS_DEFAULT_PAGE_NO); + return queryParams; + } + + /** + * Calculate the offset according to schedule info + * + * @param scheduleInfo The schedule info + * @return timestamp between two schedule task + */ + public static long calculateOffset(ScheduleInfo scheduleInfo) { + if (scheduleInfo == null) { + LOGGER.error("ScheduleInfo cannot be null"); + throw new DolphinScheduleException("ScheduleInfo cannot be null"); + } + + long offset = 0; + + // Determine offset based on schedule type + if (scheduleInfo.getScheduleType() == null) { + LOGGER.error("Schedule type cannot be null"); + throw new DolphinScheduleException("Schedule type cannot be null"); + } + + switch (scheduleInfo.getScheduleType()) { + case 0: // Normal scheduling + offset = calculateNormalOffset(scheduleInfo); + break; + case 1: // Crontab scheduling + offset = calculateCronOffset(scheduleInfo); + break; + default: + LOGGER.error("Invalid schedule type"); + throw new DolphinScheduleException( + UNSUPPORTED_SCHEDULE_TYPE, "Invalid schedule type"); + } + + // Add delay time if specified + if (scheduleInfo.getDelayTime() != null) { + offset += scheduleInfo.getDelayTime() * MILLIS_IN_SECOND; + } + + return offset; + } + + private static long calculateNormalOffset(ScheduleInfo scheduleInfo) { + if (scheduleInfo.getScheduleInterval() == null || scheduleInfo.getScheduleUnit() == null) { + LOGGER.error("Schedule interval and unit cannot be null for normal scheduling"); + throw new IllegalArgumentException("Schedule interval and unit cannot be null for normal scheduling"); + } + switch (Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()))) { + case YEAR: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_YEAR; + case MONTH: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_MONTH; + case WEEK: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_WEEK; + case DAY: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_DAY; + case HOUR: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_HOUR; + case MINUTE: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_MINUTE; + case SECOND: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_SECOND; + case ONE_ROUND: + return scheduleInfo.getScheduleInterval(); + default: + LOGGER.error("Invalid schedule unit"); + throw new DolphinScheduleException("Invalid schedule unit"); + } + } + + private static long calculateCronOffset(ScheduleInfo scheduleInfo) { + if (scheduleInfo.getCrontabExpression() == null) { + LOGGER.error("Crontab expression cannot be null for schedule type crontab"); + throw new DolphinScheduleException("Crontab expression cannot be null for schedule type crontab"); + } + + try { + CronExpression cronExpression = new CronExpression(scheduleInfo.getCrontabExpression()); + Date firstExecution = cronExpression.getNextValidTimeAfter(new Date()); + Date secondExecution = cronExpression.getNextValidTimeAfter(firstExecution); + + if (secondExecution != null) { + return secondExecution.getTime() - firstExecution.getTime(); + } else { + LOGGER.error("Unable to calculate the next execution times for the cron expression"); + throw new DolphinScheduleException( + "Unable to calculate the next execution times for the cron expression"); + } + } catch (Exception e) { + LOGGER.error("Invalid cron expression: ", e); + throw new DolphinScheduleException(String.format("Invalid cron expression: %s", e.getMessage())); + } + } + + private static String generateCrontabExpression(String scheduleUnit, Integer scheduleInterval) { + if (scheduleUnit.isEmpty()) { + LOGGER.error("Schedule unit and interval must not be null for generating crontab expression"); + throw new DolphinScheduleException( + "Schedule unit and interval must not be null for generating crontab expression"); + } + String crontabExpression; + + switch (Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleUnit))) { + case SECOND: + crontabExpression = String.format("0/%d * * * * ? *", scheduleInterval); + break; + case MINUTE: + crontabExpression = String.format("* 0/%d * * * ? *", scheduleInterval); + break; + case HOUR: + crontabExpression = String.format("* * 0/%d * * ? *", scheduleInterval); + break; + case DAY: + crontabExpression = String.format("* * * 1/%d * ? *", scheduleInterval); + break; + case WEEK: + crontabExpression = String.format("* * * 1/%d * ? *", scheduleInterval * 7); + break; + case MONTH: + crontabExpression = String.format("* * * * 0/%d ? *", scheduleInterval); + break; + case YEAR: + crontabExpression = String.format("* * * * * ? 0/%d", scheduleInterval); + break; + default: + LOGGER.error("Unsupported schedule unit for generating crontab: {}", scheduleUnit); + throw new DolphinScheduleException("Unsupported schedule unit for generating crontab: " + scheduleUnit); + } + + return crontabExpression; + } + + /** + * Executes an HTTP request using OkHttp. Supports various HTTP methods (GET, POST, PUT, DELETE). + * + * @param url The URL of the request. + * @param method The HTTP method (GET, POST, PUT, DELETE). + * @param queryParams The query parameters for the request (optional). + * @param headers The headers for the request. + * @return A JsonObject containing the response from the server. + * @throws DolphinScheduleException If an error occurs during the request. + */ + private static JsonObject executeHttpRequest(String url, String method, Map queryParams, + Map headers) { + HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(url)).newBuilder(); + + for (Map.Entry entry : queryParams.entrySet()) { + urlBuilder.addQueryParameter(entry.getKey(), entry.getValue()); + } + HttpUrl httpUrl = urlBuilder.build(); + + Request.Builder requestBuilder = new Request.Builder() + .url(httpUrl); + + for (Map.Entry entry : headers.entrySet()) { + requestBuilder.addHeader(entry.getKey(), entry.getValue()); + } + RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE), ""); + + switch (method.toUpperCase()) { + case POST: + requestBuilder.post(body); + break; + case GET: + requestBuilder.get(); + break; + case DELETE: + requestBuilder.delete(body); + break; + default: + throw new DolphinScheduleException(INVALID_HTTP_METHOD, + String.format("Unsupported request method: %s", method)); + } + + Request request = requestBuilder.build(); + + // get response + try (Response response = CLIENT.newCall(request).execute()) { + String responseBody = response.body() != null ? response.body().string() : null; + LOGGER.debug("HTTP request to {} completed with status code {}", httpUrl, response.code()); + + if (response.isSuccessful() && responseBody != null) { + return JsonParser.parseString(responseBody).getAsJsonObject(); + } else { + LOGGER.error("HTTP request to {} failed. HTTP Status: {}, Response Body: {}", httpUrl, response.code(), + responseBody != null ? responseBody : "No response body"); + + throw new DolphinScheduleException( + HTTP_REQUEST_FAILED, + String.format("HTTP request to %s failed. Status: %d, Response: %s", + httpUrl, response.code(), responseBody != null ? responseBody : "No response body")); + } + } catch (IOException e) { + throw new DolphinScheduleException( + NETWORK_ERROR, + String.format("Network error during HTTP request to %s. Reason: %s", httpUrl, e.getMessage())); + } catch (Exception e) { + throw new DolphinScheduleException( + UNEXPECTED_ERROR, + String.format("Unexpected error during HTTP request to %s. Reason: %s", httpUrl, e.getMessage())); + } + } + + /** + * Shell node in DolphinScheduler need to write in a script + * When process definition schedule run, the shell node run, + * Call back in inlong, sending a request with parameters required + */ + private static String buildScript(String host, int port, String username, String password, long offset, + String groupId) { + LOGGER.info("build script for host: {}, port: {}, username: {}, password: {}, offset: {}, groupId: {}", host, + port, username, password, offset, groupId); + return "#!/bin/bash\n\n" + + + // Get current timestamp + "# Get current timestamp\n" + + "lowerBoundary=$(date +%s)\n" + + "echo \"get lowerBoundary: ${lowerBoundary}\"\n" + + "upperBoundary=$(($lowerBoundary + " + offset + "))\n" + + "echo \"get upperBoundary: ${upperBoundary}\"\n\n" + + + // Set URL + "# Set URL and HTTP method\n" + + "url=\"http://" + host + ":" + port + SHELL_REQUEST_API + + "?username=" + username + "&password=" + password + "\"\n" + + "echo \"get url: ${url}\"\n" + + + // Set HTTP method + "httpMethod=\"POST\"\n\n" + + + // Set request body + "# Build request body\n" + + "jsonBody=$(cat < dolphinSchedulerContainer = + new GenericContainer<>(DS_IMAGE_NAME + ":" + DS_IMAGE_TAG) + .withExposedPorts(12345, 25333) + .withEnv("TZ", DS_DEFAULT_TIMEZONE_ID) + .withNetwork(NETWORK) + .withAccessToHost(true) + .withNetworkAliases(INTER_CONTAINER_DS_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(DS_LOG)); + + /** + * This method just for DS testing, login by default admin username and password + * generate a 1-day expiring token for test, the token will disappear with the DS container shutting down + * + * @return the DS token + */ + protected static String accessToken() { + Map loginParams = new HashMap<>(); + loginParams.put(DS_USERNAME, DS_DEFAULT_USERNAME); + loginParams.put(DS_PASSWORD, DS_DEFAULT_PASSWORD); + try { + JsonObject loginResponse = + executeHttpRequest(DS_DEFAULT_SERVICE_URL + DS_LOGIN_URL, loginParams, new HashMap<>()); + if (loginResponse.get("success").getAsBoolean()) { + String tokenGenUrl = DS_DEFAULT_SERVICE_URL + DS_TOKEN_URL + DS_TOKEN_GEN_URL; + Map tokenParams = new HashMap<>(); + tokenParams.put(DS_USERID, String.valueOf(DS_DEFAULT_USERID)); + + LocalDateTime now = LocalDateTime.now(); + LocalDateTime tomorrow = now.plusDays(1); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DS_EXPIRE_TIME_FORMAT); + String expireTime = tomorrow.format(formatter); + tokenParams.put(DS_EXPIRE_TIME, expireTime); + + Map cookies = new HashMap<>(); + cookies.put(DS_COOKIE_SC_TYPE, loginResponse.get(DS_RESPONSE_DATA) + .getAsJsonObject().get(DS_COOKIE_SC_TYPE).getAsString()); + cookies.put(DS_COOKIE_SESSION_ID, loginResponse.get(DS_RESPONSE_DATA) + .getAsJsonObject().get(DS_COOKIE_SESSION_ID).getAsString()); + + JsonObject tokenGenResponse = executeHttpRequest(tokenGenUrl, tokenParams, cookies); + + String accessTokenUrl = DS_DEFAULT_SERVICE_URL + DS_TOKEN_URL; + tokenParams.put(DS_RESPONSE_TOKEN, tokenGenResponse.get(DS_RESPONSE_DATA).getAsString()); + JsonObject result = executeHttpRequest(accessTokenUrl, tokenParams, cookies); + String token = result.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_RESPONSE_TOKEN).getAsString(); + DS_LOG.info("login and generate token success, token: {}", token); + return token; + } + return null; + } catch (Exception e) { + DS_LOG.error("login and generate token fail: ", e); + throw new DolphinScheduleException(String.format("login and generate token fail: %s", e.getMessage())); + } + } + + private static JsonObject executeHttpRequest(String url, Map queryParams, + Map cookies) throws IOException { + OkHttpClient client = new OkHttpClient(); + + // Build query parameters + HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(url)).newBuilder(); + for (Map.Entry entry : queryParams.entrySet()) { + urlBuilder.addQueryParameter(entry.getKey(), entry.getValue()); + } + HttpUrl httpUrl = urlBuilder.build(); + + // Build the request + Request.Builder requestBuilder = new Request.Builder() + .url(httpUrl); + + // Add cookies to the request + if (cookies != null && !cookies.isEmpty()) { + String cookieHeader = cookies.entrySet() + .stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining("; ")); + requestBuilder.header(DS_COOKIE, cookieHeader); + } + + RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE), ""); + requestBuilder.post(body); + + Request request = requestBuilder.build(); + + // Execute the request and parse the response + try (Response response = client.newCall(request).execute()) { + if (response.isSuccessful() && response.body() != null) { + String responseBody = response.body().string(); + return JsonParser.parseString(responseBody).getAsJsonObject(); + } else { + DS_LOG.error("Unexpected http response error: {}", response); + throw new DolphinScheduleException("Unexpected http response error " + response); + } + } + } + +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java new file mode 100644 index 00000000000..f95a5268ee3 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.Timeout; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; + +import javax.annotation.Resource; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +@SpringBootTest(classes = DolphinScheduleEngineTest.class) +@ComponentScan(basePackages = "org.apache.inlong.manager") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class DolphinScheduleEngineTest extends DolphinScheduleContainerTestEnv { + + @Resource + private DolphinScheduleEngine dolphinScheduleEngine; + + @BeforeAll + public void beforeAll() { + dolphinSchedulerContainer.setPortBindings(Arrays.asList("12345:12345", "25333:25333")); + dolphinSchedulerContainer.start(); + assertTrue(dolphinSchedulerContainer.isRunning(), "DolphinScheduler container should be running"); + + String token = accessToken(); + dolphinScheduleEngine.setToken(token); + dolphinScheduleEngine.start(); + } + + @AfterAll + public void afterAll() { + dolphinScheduleEngine.stop(); + if (dolphinSchedulerContainer != null) { + dolphinSchedulerContainer.stop(); + } + } + + @Test + @Order(1) + @Timeout(30) + public void testRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testRegister(scheduleInfo); + + // 2. test for cron schedule + scheduleInfo = genDefaultCronScheduleInfo(); + testRegister(scheduleInfo); + } + + private void testRegister(ScheduleInfo scheduleInfo) { + // register schedule info + dolphinScheduleEngine.handleRegister(scheduleInfo); + assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size()); + } + + @Test + @Order(2) + @Timeout(30) + public void testUnRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testUnRegister(scheduleInfo); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = genDefaultCronScheduleInfo(); + testUnRegister(scheduleInfo); + } + + private void testUnRegister(ScheduleInfo scheduleInfo) { + // register schedule info + dolphinScheduleEngine.handleRegister(scheduleInfo); + assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size()); + + // Un-register schedule info + dolphinScheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId()); + assertEquals(0, dolphinScheduleEngine.getScheduledProcessMap().size()); + } + + @Test + @Order(3) + @Timeout(30) + public void testUpdateScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testRegister(scheduleInfo); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = genDefaultCronScheduleInfo(); + testUpdate(scheduleInfo); + } + + private void testUpdate(ScheduleInfo scheduleInfo) { + // register schedule info + dolphinScheduleEngine.handleUpdate(scheduleInfo); + assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size()); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java new file mode 100644 index 00000000000..a2f6d97e0cd --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +public class DolphinSchedulerContainerEnvConstants { + + // DS env image related constants + protected static final String DS_IMAGE_NAME = "apache/dolphinscheduler-standalone-server"; + protected static final String DS_IMAGE_TAG = "3.2.2"; + protected static final String INTER_CONTAINER_DS_ALIAS = "dolphinscheduler"; + + // DS env url related constants + protected static final String DS_DEFAULT_SERVICE_URL = "http://127.0.0.1:12345/dolphinscheduler"; + protected static final String DS_LOGIN_URL = "/login"; + protected static final String DS_TOKEN_URL = "/access-tokens"; + protected static final String DS_TOKEN_GEN_URL = "/generate"; + + // DS env api params related constants + protected static final String DS_USERNAME = "userName"; + protected static final String DS_PASSWORD = "userPassword"; + protected static final String DS_USERID = "userId"; + protected static final String DS_COOKIE = "Cookie"; + protected static final String DS_COOKIE_SC_TYPE = "securityConfigType"; + protected static final String DS_COOKIE_SESSION_ID = "sessionId"; + protected static final String DS_EXPIRE_TIME = "expireTime"; + protected static final String DS_EXPIRE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + // DS env token related constants + protected static final String DS_RESPONSE_TOKEN = "token"; + + // DS env default admin user info + protected static final String DS_DEFAULT_USERNAME = "admin"; + protected static final String DS_DEFAULT_PASSWORD = "dolphinscheduler123"; + protected static final Integer DS_DEFAULT_USERID = 1; + +} diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 2bad5f801f7..e05c66a6742 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -103,11 +103,13 @@ agent.install.temp.path=inlong/agent-installer-temp/ # The primary key id of the default agent module used default.module.id=1 -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none +# Dirty log dirty.log.clean.enabled=false dirty.log.clean.interval.minutes=5 dirty.dirty.retention.minutes=10 -dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg + +# DolphinScheduler related config +schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler +schedule.engine.dolphinscheduler.token=default_token_value diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 040c868bcf8..7441ea55e68 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -95,11 +95,12 @@ group.deleted.enabled=false # Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none - +# Dirty log dirty.log.clean.enabled=false dirty.log.clean.interval.minutes=5 dirty.dirty.retention.minutes=10 -dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg + +# DolphinScheduler related config +schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler +schedule.engine.dolphinscheduler.token=default_token_value diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 393eef6b05a..f0e42182c5f 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -96,11 +96,12 @@ group.deleted.enabled=false # Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none - +# Dirty log dirty.log.clean.enabled=false dirty.log.clean.interval.minutes=5 dirty.dirty.retention.minutes=10 -dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg + +# DolphinScheduler related config +schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler +schedule.engine.dolphinscheduler.token=default_token_value