Skip to content

Commit

Permalink
[INLONG-11483][Manager] Support multiple scedule engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Aloys Zhang committed Nov 12, 2024
1 parent cef2dbd commit f42db2a
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ScheduleEntity implements Serializable {
private String inlongGroupId;
// schedule type, support [normal, crontab], 0 for normal and 1 for crontab
private Integer scheduleType;
// schedule engine type, support [Quartz, Airflow, Dolphinscheduler]
private String scheduleEngine;
// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
private String scheduleUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<id column="id" jdbcType="INTEGER" property="id"/>
<result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
<result column="schedule_type" jdbcType="INTEGER" property="scheduleType"/>
<result column="schedule_engine" jdbcType="VARCHAR" property="scheduleEngine"/>
<result column="schedule_unit" jdbcType="VARCHAR" property="scheduleUnit"/>
<result column="schedule_interval" jdbcType="INTEGER" property="scheduleInterval"/>
<result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
Expand All @@ -42,25 +43,25 @@
</resultMap>

<sql id="Base_Column_List">
id, inlong_group_id, schedule_type, schedule_unit, schedule_interval, start_time,
id, inlong_group_id, schedule_type, schedule_engine, schedule_unit, schedule_interval, start_time,
end_time, delay_time, self_depend, task_parallelism, crontab_expression,
status, previous_status, is_deleted, creator, modifier, create_time, modify_time, version
</sql>

<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
insert into schedule_config (id, inlong_group_id, schedule_type, schedule_unit,
schedule_interval, start_time, end_time, delay_time,
self_depend, task_parallelism, crontab_expression,
insert into schedule_config (id, inlong_group_id, schedule_type, schedule_engine,
schedule_unit, schedule_interval, start_time, end_time,
delay_time, self_depend, task_parallelism, crontab_expression,
status, previous_status, creator, modifier)
values (#{id, jdbcType=INTEGER}, #{inlongGroupId, jdbcType=VARCHAR},
#{scheduleType, jdbcType=INTEGER}, #{scheduleUnit, jdbcType=VARCHAR},
#{scheduleInterval, jdbcType=INTEGER}, #{startTime, jdbcType=TIMESTAMP},
#{endTime, jdbcType=TIMESTAMP}, #{delayTime, jdbcType=INTEGER},
#{selfDepend, jdbcType=INTEGER}, #{taskParallelism, jdbcType=INTEGER},
#{crontabExpression, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
#{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
#{modifier,jdbcType=VARCHAR})
#{scheduleType, jdbcType=INTEGER}, #{scheduleEngine, jdbcType=VARCHAR},
#{scheduleUnit, jdbcType=VARCHAR}, #{scheduleInterval, jdbcType=INTEGER},
#{startTime, jdbcType=TIMESTAMP}, #{endTime, jdbcType=TIMESTAMP},
#{delayTime, jdbcType=INTEGER}, #{selfDepend, jdbcType=INTEGER},
#{taskParallelism, jdbcType=INTEGER}, #{crontabExpression, jdbcType=VARCHAR},
#{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
</insert>

<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
Expand Down Expand Up @@ -88,6 +89,9 @@
<if test="scheduleType != null">
schedule_type = #{scheduleType, jdbcType=INTEGER},
</if>
<if test="scheduleEngine != null">
schedule_engine = #{scheduleEngine, jdbcType=VARCHAR},
</if>
<if test="scheduleUnit !=null">
schedule_unit = #{scheduleUnit, jdbcType=VARCHAR},
</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ScheduleEntityTest extends DaoBaseTest {
public static final String GROUP_ID_PREFIX = "test_group_";
public static final String USER = "admin";
public static final int SCHEDULE_TYPE = 0;
public static final String SCHEDULE_ENGINE = "Quartz";
public static final int SCHEDULE_TYPE_NEW = 1;
public static final String SCHEDULE_UNIT = "H";
public static final String SCHEDULE_UNIT_NEW = "D";
Expand Down Expand Up @@ -63,6 +64,7 @@ public void testUpdate() throws Exception {
ScheduleEntity entityQueried = scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
Assertions.assertEquals(scheduleEntity.getInlongGroupId(), entityQueried.getInlongGroupId());
Assertions.assertEquals(SCHEDULE_TYPE, entityQueried.getScheduleType());
Assertions.assertEquals(SCHEDULE_ENGINE, entityQueried.getScheduleEngine());
Assertions.assertEquals(SCHEDULE_UNIT, entityQueried.getScheduleUnit());
Assertions.assertEquals(SCHEDULE_INTERVAL, entityQueried.getScheduleInterval());
Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
Expand Down Expand Up @@ -105,6 +107,7 @@ private ScheduleEntity genEntity() {
ScheduleEntity entity = new ScheduleEntity();
entity.setInlongGroupId(GROUP_ID_PREFIX + System.currentTimeMillis());
entity.setScheduleType(SCHEDULE_TYPE);
entity.setScheduleEngine(SCHEDULE_ENGINE);
entity.setScheduleUnit(SCHEDULE_UNIT);
entity.setScheduleInterval(SCHEDULE_INTERVAL);
entity.setStartTime(DEFAULT_TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// schedule engine type, support [Quartz, Airflow, Dolphinscheduler]
@ApiModelProperty(value = "Schedule engine, support Quartz, Airflow and Dolphinscheduler")
@Length(min = 1, max = 20, message = "length must be between 1 and 20")
private String scheduleEngine;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.sql.Timestamp;
import java.util.Objects;
import org.hibernate.validator.constraints.Length;

@Data
@Builder
Expand All @@ -50,6 +51,10 @@ public class ScheduleInfo {
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// schedule engine type, support [Quartz, Airflow, Dolphinscheduler]
@ApiModelProperty("Schedule engine")
private String scheduleEngine;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
Expand Down Expand Up @@ -91,6 +96,7 @@ public boolean equals(Object o) {
ScheduleInfo that = (ScheduleInfo) o;
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
&& Objects.equals(scheduleEngine, that.scheduleEngine)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime)
Expand All @@ -103,9 +109,8 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime,
delayTime,
selfDepend, taskParallelism, crontabExpression, version);
return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine, scheduleUnit, scheduleInterval, startTime,
endTime, delayTime, selfDepend, taskParallelism, crontabExpression, version);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.sql.Timestamp;
import java.util.Objects;
import org.hibernate.validator.constraints.Length;

@Data
@ApiModel("Schedule request")
Expand All @@ -44,6 +45,10 @@ public class ScheduleInfoRequest {
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// schedule engine type, support [Quartz, Airflow, Dolphinscheduler]
@ApiModelProperty(value = "Schedule engine")
private String scheduleEngine;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
Expand Down Expand Up @@ -85,6 +90,7 @@ public boolean equals(Object o) {
ScheduleInfoRequest that = (ScheduleInfoRequest) o;
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
&& Objects.equals(scheduleEngine, that.scheduleEngine)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime)
Expand All @@ -97,8 +103,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime,
delayTime,
selfDepend, taskParallelism, crontabExpression, version);
return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine, scheduleUnit, scheduleInterval,
startTime, endTime, delayTime, selfDepend, taskParallelism, crontabExpression, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.List;
Expand All @@ -34,13 +33,10 @@ public class ScheduleClientFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleClientFactory.class);

@Value("${inlong.schedule.engine:none}")
private String scheduleEngineName;

@Autowired
List<ScheduleEngineClient> scheduleEngineClients;

public ScheduleEngineClient getInstance() {
public ScheduleEngineClient getInstance(String scheduleEngineName) {
Optional<ScheduleEngineClient> optScheduleClient =
scheduleEngineClients.stream().filter(t -> t.accept(scheduleEngineName)).findFirst();
if (!optScheduleClient.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ private void registerScheduleInfoForApprovedGroup(ScheduleInfo scheduleInfo, Str
}
}

private ScheduleEngineClient getScheduleEngineClient() {
private ScheduleEngineClient getScheduleEngineClient(String scheduleEngine) {
if (scheduleEngineClient == null) {
scheduleEngineClient = scheduleClientFactory.getInstance();
scheduleEngineClient = scheduleClientFactory.getInstance(scheduleEngine);
}
return scheduleEngineClient;
}
Expand Down Expand Up @@ -143,8 +143,8 @@ public Boolean updateAndRegister(ScheduleInfoRequest request, String operator) {
* */
private Boolean registerToScheduleEngine(ScheduleInfo scheduleInfo, String operator, boolean isUpdate) {
// update(un-register and then register) or register
boolean res = isUpdate ? getScheduleEngineClient().update(scheduleInfo)
: getScheduleEngineClient().register(scheduleInfo);
boolean res = isUpdate ? getScheduleEngineClient(scheduleInfo.getScheduleEngine()).update(scheduleInfo)
: getScheduleEngineClient(scheduleInfo.getScheduleEngine()).register(scheduleInfo);
// update status to REGISTERED
scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), REGISTERED, operator);
LOGGER.info("{} schedule info success for group {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab',
`schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and Dolphinscheduler',
`schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule',
Expand Down
1 change: 1 addition & 0 deletions inlong-manager/manager-web/sql/apache_inlong_manager.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab',
`schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and Dolphinscheduler',
`schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule',
Expand Down
27 changes: 27 additions & 0 deletions inlong-manager/manager-web/sql/changes-2.1.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
-- When upgrading to version 1.5.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module.

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

USE `apache_inlong_manager`;

ALTER TABLE `schedule_config`
ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and Dolphinscheduler';

0 comments on commit f42db2a

Please sign in to comment.