Skip to content

Commit

Permalink
[INLONG-10360][Manager] Combine schedule state transition with group …
Browse files Browse the repository at this point in the history
…operations (#10445)

Co-authored-by: fuweng11 <[email protected]>
  • Loading branch information
aloyszhang and fuweng11 committed Jun 25, 2024
1 parent 50b7705 commit e3beb8c
Show file tree
Hide file tree
Showing 29 changed files with 746 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public enum ErrorCodeEnum {

SCHEDULE_NOT_FOUND(1700, "Schedule info not found"),
SCHEDULE_DUPLICATE(1701, "Schedule info already exist"),
SCHEDULE_ENGINE_NOT_SUPPORTED(1702, "Schedule engine type not supported"),
SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED(1703, "Schedule status transition is not allowed"),

WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no operation authority"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,25 @@

import lombok.Getter;

/**
* Status for schedule info.
* This is the transient status of the schedule info.
* With specified operations, the status will change to corresponding value.
* Status Operations
* NEW inlong group created with schedule info
* APPROVED the new inlong group approved by admin
* REGISTERED schedule info registered to schedule engine
* UPDATED update schedule info for a group
* DELETED delete a group
* */
@Getter
public enum ScheduleStatus {

NEW(100, "new"),
DELETED(40, "deleted");
APPROVED(101, "approved"),
REGISTERED(102, "registered"),
UPDATED(103, "updated"),
DELETED(99, "deleted");

private final Integer code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.sql.Timestamp;
import java.util.Date;
import java.util.List;

Expand Down Expand Up @@ -137,6 +138,36 @@ public abstract class InlongGroupInfo extends BaseInlongGroup {
@ApiModelProperty(value = "Inlong tenant")
private String tenant;

// schedule type, support [normal, crontab], 0 for normal and 1 for crontab
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;

@ApiModelProperty("Schedule interval")
private Integer scheduleInterval;

@ApiModelProperty("Start time")
private Timestamp startTime;

@ApiModelProperty("End time")
private Timestamp endTime;

@ApiModelProperty("Delay time")
private Integer delayTime;

@ApiModelProperty("Self depend")
private Integer selfDepend;

@ApiModelProperty("Schedule task parallelism")
private Integer taskParallelism;

@ApiModelProperty("Schedule task parallelism")
private Integer crontabExpression;

public abstract InlongGroupRequest genRequest();

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;

import java.sql.Timestamp;
import java.util.List;

/**
Expand Down Expand Up @@ -130,4 +131,34 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@NotNull(groups = UpdateValidation.class, message = "version cannot be null")
private Integer version;

// schedule type, support [normal, crontab], 0 for normal and 1 for crontab
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;

@ApiModelProperty("Schedule interval")
private Integer scheduleInterval;

@ApiModelProperty("Start time")
private Timestamp startTime;

@ApiModelProperty("End time")
private Timestamp endTime;

@ApiModelProperty("Delay time")
private Integer delayTime;

@ApiModelProperty("Self depend")
private Integer selfDepend;

@ApiModelProperty("Schedule task parallelism")
private Integer taskParallelism;

@ApiModelProperty("Schedule task parallelism")
private Integer crontabExpression;

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.validation.constraints.NotNull;

import java.sql.Timestamp;
import java.util.Objects;

@Data
@Builder
Expand Down Expand Up @@ -79,4 +80,32 @@ public class ScheduleInfo {
@NotNull(groups = UpdateValidation.class, message = "version cannot be null")
private Integer version;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ScheduleInfo that = (ScheduleInfo) o;
return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit,
that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend,
that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
&& Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version,
that.version);
}

@Override
public int hashCode() {
return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime,
delayTime,
selfDepend, taskParallelism, crontabExpression, version);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.validation.constraints.NotNull;

import java.sql.Timestamp;
import java.util.Objects;

@Data
@ApiModel("Schedule request")
Expand Down Expand Up @@ -73,4 +74,31 @@ public class ScheduleInfoRequest {
@NotNull(groups = UpdateValidation.class, message = "version cannot be null")
private Integer version;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ScheduleInfoRequest that = (ScheduleInfoRequest) o;
return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit,
that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend,
that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
&& Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version,
that.version);
}

@Override
public int hashCode() {
return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime,
delayTime,
selfDepend, taskParallelism, crontabExpression, version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.schedule;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;

import org.springframework.stereotype.Service;

@Service
public class NoopScheduleClient implements ScheduleEngineClient {

@Override
public boolean accept(String engineType) {
return ScheduleEngineType.NONE.getType().equals(engineType);
}

@Override
public boolean register(ScheduleInfo scheduleInfo) {
return true;
}

@Override
public boolean unregister(String groupId) {
return true;
}

@Override
public boolean update(ScheduleInfo scheduleInfo) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.schedule;

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;

@Service
public class ScheduleClientFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleClientFactory.class);

@Value("${inlong.schedule.engine:none}")
private String scheduleEngineName;

@Autowired
List<ScheduleEngineClient> scheduleEngineClients;

public ScheduleEngineClient getInstance() {
Optional<ScheduleEngineClient> optScheduleClient =
scheduleEngineClients.stream().filter(t -> t.accept(scheduleEngineName)).findFirst();
if (!optScheduleClient.isPresent()) {
LOGGER.warn("Schedule engine client not found for {} ", scheduleEngineName);
throw new BusinessException(ErrorCodeEnum.SCHEDULE_ENGINE_NOT_SUPPORTED,
String.format(ErrorCodeEnum.SCHEDULE_ENGINE_NOT_SUPPORTED.getMessage(), scheduleEngineName));
}
LOGGER.info("Get schedule engine client success for {}", scheduleEngineName);
return optScheduleClient.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public interface ScheduleEngine {

/**
* Handle schedule unregister.
* @param scheduleInfo schedule info to unregister
* @param groupId group to un-register schedule info
* */
boolean handleUnregister(ScheduleInfo scheduleInfo);
boolean handleUnregister(String groupId);

/**
* Handle schedule update.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
* */
public interface ScheduleEngineClient {

/**
* Check whether scheduleEngine type is matched.
* */
boolean accept(String engineType);

/**
* Register schedule to schedule engine.
* @param scheduleInfo schedule info to register
Expand All @@ -32,9 +37,10 @@ public interface ScheduleEngineClient {

/**
* Un-register schedule from schedule engine.
* @param scheduleInfo schedule info to unregister
* */
boolean unregister(ScheduleInfo scheduleInfo);
*
* @param groupId schedule info to unregister
*/
boolean unregister(String groupId);

/**
* Update schedule from schedule engine.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.schedule;

import lombok.Getter;

@Getter
public enum ScheduleEngineType {

NONE("None"),
QUARTZ("Quartz");

private final String type;

ScheduleEngineType(String type) {
this.type = type;
}
}
Loading

0 comments on commit e3beb8c

Please sign in to comment.