Skip to content

Commit

Permalink
[Fix-15129][Dependent] Fix the ambiguity in date rules for dependent …
Browse files Browse the repository at this point in the history
…node.
  • Loading branch information
李乐 committed Nov 10, 2023
1 parent 65a7c7f commit 5e5fdb8
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,14 @@ List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int process
* find last task instance list in the date interval
*
* @param taskCodes taskCodes
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @return task instance list
*/
List<TaskInstance> findLastTaskInstances(@Param("taskCodes") Set<Long> taskCodes,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
List<TaskInstance> findLastTaskInstances(@Param("processInstanceId") Integer processInstanceId,
@Param("taskCodes") Set<Long> taskCodes,
@Param("testFlag") int testFlag);

TaskInstance findLastTaskInstance(@Param("taskCode") long depTaskCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
TaskInstance findLastTaskInstance(@Param("processInstanceId") Integer processInstanceId,
@Param("taskCode") long depTaskCode,
@Param("testFlag") int testFlag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
*/
ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);

/**
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @return process instance
*/
ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);

/**
* query first schedule process instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,21 @@ public interface TaskInstanceDao extends IDao<TaskInstance> {
/**
* find last task instance list corresponding to taskCodes in the date interval
*
* @param processInstanceId Task's parent process instance id
* @param taskCodes taskCodes
* @param dateInterval dateInterval
* @param testFlag test flag
* @return task instance list
*/
List<TaskInstance> queryLastTaskInstanceListIntervalByTaskCodes(Set<Long> taskCodes, DateInterval dateInterval,
int testFlag);
List<TaskInstance> queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
Set<Long> taskCodes, int testFlag);

/**
* find last task instance corresponding to taskCode in the date interval
* @param processInstanceId Task's parent process instance id
* @param depTaskCode taskCode
* @param dateInterval dateInterval
* @param testFlag test flag
* @return task instance
*/
TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval, int testFlag);
TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId,
long depTaskCode, int testFlag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,6 @@ public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, Da
testFlag);
}

/**
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @return process instance
*/
@Override
public ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval,
int testFlag) {
return mybatisMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);
}

/**
* query first schedule process instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,15 @@ public List<TaskInstance> queryByWorkflowInstanceId(Integer workflowInstanceId)
}

@Override
public List<TaskInstance> queryLastTaskInstanceListIntervalByTaskCodes(Set<Long> taskCodes,
DateInterval dateInterval, int testFlag) {
return mybatisMapper.findLastTaskInstances(taskCodes, dateInterval.getStartTime(), dateInterval.getEndTime(),
testFlag);
public List<TaskInstance> queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
Set<Long> taskCodes,
int testFlag) {
return mybatisMapper.findLastTaskInstances(processInstanceId, taskCodes, testFlag);
}

@Override
public TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval,
int testFlag) {
return mybatisMapper.findLastTaskInstance(depTaskCode, dateInterval.getStartTime(), dateInterval.getEndTime(),
testFlag);
public TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId, long depTaskCode,
int testFlag) {
return mybatisMapper.findLastTaskInstance(processInstanceId, depTaskCode, testFlag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,27 +350,25 @@
select task_code, max(end_time) as max_end_time
from t_ds_task_instance
where 1=1 and test_flag = #{testFlag}
and instance.process_instance_id = #{processInstanceId}
<if test="taskCodes != null and taskCodes.size() != 0">
and task_code in
<foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="startTime!=null and endTime != null">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
group by task_code
) t_max
on instance.task_code = t_max.task_code and instance.end_time = t_max.max_end_time
on instance.process_instance_id = t_max.process_instance_id
and instance.task_code = t_max.task_code
and instance.end_time = t_max.max_end_time
</select>
<select id="findLastTaskInstance" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where task_code = #{taskCode}
<if test="startTime!=null and endTime != null">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
where process_instance_id = #{processInstanceId}
and task_code = #{taskCode}
order by end_time desc limit 1
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,8 @@ CREATE TABLE `t_ds_task_instance` (
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
KEY `idx_cache_key` (`cache_key`) USING BTREE
KEY `idx_cache_key` (`cache_key`) USING BTREE,
KEY `idx_pid_code_end_time` (`process_instance_id`, `task_code`, `end_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;

-- ----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ CREATE TABLE t_ds_task_instance (

create index idx_task_instance_code_version on t_ds_task_instance (task_code, task_definition_version);
create index idx_cache_key on t_ds_task_instance (cache_key);
create index idx_task_instance_pid_code_end_time on t_ds_task_instance (process_instance_id, task_code, end_time);

--
-- Table structure for table t_ds_tenant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ d//
delimiter ;
CALL drop_t_ds_k8s_namespace_col_pod_request_memory;
DROP PROCEDURE drop_t_ds_k8s_namespace_col_pod_request_memory;


CREATE INDEX IF NOT EXISTS idx_pid_code_end_time ON t_ds_task_instance (process_instance_id, task_code, end_time);
ANALYZE TABLE t_ds_task_instance;
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "limits_memory";
ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "pod_replicas";
ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "pod_request_cpu";
ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "pod_request_memory";

create index if not exists idx_task_instance_pid_code_end_time on t_ds_task_instance (process_instance_id, task_code, end_time);
analyze t_ds_task_instance;
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ private DependResult calculateResultForTasks(DependentItem dependentItem,
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_WORKFLOW_CODE) {
result = dependResultByProcessInstance(processInstance);
} else if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
result = dependResultByAllTaskOfProcessInstance(processInstance, dateInterval, testFlag);
result = dependResultByAllTaskOfProcessInstance(processInstance, testFlag);
} else {
result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), dateInterval,
testFlag);
result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), testFlag);
}
if (result != DependResult.SUCCESS) {
break;
Expand Down Expand Up @@ -194,8 +193,7 @@ private DependResult dependResultByProcessInstance(ProcessInstance processInstan
*
* @return
*/
private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance,
DateInterval dateInterval, int testFlag) {
private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, int testFlag) {
if (!processInstance.getState().isFinished()) {
log.info("Wait for the dependent workflow to complete, processCode: {}, processInstanceId: {}.",
processInstance.getProcessDefinitionCode(), processInstance.getId());
Expand All @@ -212,8 +210,8 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc
.collect(Collectors.toMap(TaskDefinitionLog::getCode, TaskDefinitionLog::getName));

List<TaskInstance> taskInstanceList =
taskInstanceDao.queryLastTaskInstanceListIntervalByTaskCodes(taskDefinitionCodeMap.keySet(),
dateInterval, testFlag);
taskInstanceDao.queryLastTaskInstanceListIntervalInProcessInstance(processInstance.getId(),
taskDefinitionCodeMap.keySet(), testFlag);
Map<Long, TaskExecutionStatus> taskExecutionStatusMap =
taskInstanceList.stream()
.filter(taskInstance -> taskInstance.getTaskExecuteType() != TaskExecuteType.STREAM)
Expand Down Expand Up @@ -245,14 +243,14 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc
*
* @param processInstance last process instance in the date interval
* @param depTaskCode the dependent task code
* @param dateInterval date interval
* @param testFlag test flag
* @return depend result
*/
private DependResult dependResultBySingleTaskInstance(ProcessInstance processInstance, long depTaskCode,
DateInterval dateInterval, int testFlag) {
int testFlag) {
TaskInstance taskInstance =
taskInstanceDao.queryLastTaskInstanceIntervalByTaskCode(depTaskCode, dateInterval, testFlag);
taskInstanceDao.queryLastTaskInstanceIntervalInProcessInstance(processInstance.getId(),
depTaskCode, testFlag);

if (taskInstance == null) {
TaskDefinition taskDefinition = taskDefinitionDao.queryByCode(depTaskCode);
Expand Down Expand Up @@ -318,21 +316,12 @@ private void addItemVarPool(String varPoolStr, Long endTime) {
*/
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {

ProcessInstance lastSchedulerProcess =
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
// Task instance cannot run without process instance,
// we should only use `scheduleTime` to search for process instances,
// as `dateInterval` is calculated based on the `scheduleTime` of the
// process instance where the `dependent` node is located.

ProcessInstance lastManualProcess =
processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval, testFlag);

if (lastManualProcess == null) {
return lastSchedulerProcess;
}
if (lastSchedulerProcess == null) {
return lastManualProcess;
}

// In the time range, there are both manual and scheduled workflow instances, return the last workflow instance
return lastManualProcess.getId() > lastSchedulerProcess.getId() ? lastManualProcess : lastSchedulerProcess;
return processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
}

/**
Expand Down

0 comments on commit 5e5fdb8

Please sign in to comment.