From 5e5fdb832d6abf1b8c10f3bd1c141a58496347ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=B9=90?= Date: Fri, 10 Nov 2023 17:42:34 +0800 Subject: [PATCH] [Fix-15129][Dependent] Fix the ambiguity in date rules for dependent node. --- .../dao/mapper/TaskInstanceMapper.java | 12 ++---- .../dao/repository/ProcessInstanceDao.java | 9 ----- .../dao/repository/TaskInstanceDao.java | 11 +++--- .../impl/ProcessInstanceDaoImpl.java | 16 -------- .../repository/impl/TaskInstanceDaoImpl.java | 15 ++++---- .../dao/mapper/TaskInstanceMapper.xml | 14 +++---- .../resources/sql/dolphinscheduler_mysql.sql | 3 +- .../sql/dolphinscheduler_postgresql.sql | 1 + .../mysql/dolphinscheduler_ddl.sql | 4 ++ .../postgresql/dolphinscheduler_ddl.sql | 3 ++ .../server/master/utils/DependentExecute.java | 37 +++++++------------ 11 files changed, 46 insertions(+), 79 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index bcf6f056f862..ea09feb86ab3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -168,18 +168,14 @@ List loadAllInfosNoRelease(@Param("processInstanceId") int process * find last task instance list in the date interval * * @param taskCodes taskCodes - * @param startTime startTime - * @param endTime endTime * @param testFlag testFlag * @return task instance list */ - List findLastTaskInstances(@Param("taskCodes") Set taskCodes, - @Param("startTime") Date startTime, - @Param("endTime") Date endTime, + List findLastTaskInstances(@Param("processInstanceId") Integer processInstanceId, + @Param("taskCodes") Set taskCodes, @Param("testFlag") int testFlag); - TaskInstance findLastTaskInstance(@Param("taskCode") long depTaskCode, - @Param("startTime") Date startTime, - @Param("endTime") Date endTime, + TaskInstance findLastTaskInstance(@Param("processInstanceId") Integer processInstanceId, + @Param("taskCode") long depTaskCode, @Param("testFlag") int testFlag); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index 91a29eacba9d..2ac9787137d2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -40,15 +40,6 @@ public interface ProcessInstanceDao extends IDao { */ ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); - /** - * find last manual process instance interval - * - * @param definitionCode process definition code - * @param dateInterval dateInterval - * @return process instance - */ - ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); - /** * query first schedule process instance * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index b5d41f878382..38d14ee2df30 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -91,20 +91,21 @@ public interface TaskInstanceDao extends IDao { /** * find last task instance list corresponding to taskCodes in the date interval * + * @param processInstanceId Task's parent process instance id * @param taskCodes taskCodes - * @param dateInterval dateInterval * @param testFlag test flag * @return task instance list */ - List queryLastTaskInstanceListIntervalByTaskCodes(Set taskCodes, DateInterval dateInterval, - int testFlag); + List queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId, + Set taskCodes, int testFlag); /** * find last task instance corresponding to taskCode in the date interval + * @param processInstanceId Task's parent process instance id * @param depTaskCode taskCode - * @param dateInterval dateInterval * @param testFlag test flag * @return task instance */ - TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval, int testFlag); + TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId, + long depTaskCode, int testFlag); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index 688556c2da31..a178c9e3fd24 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -69,22 +69,6 @@ public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, Da testFlag); } - /** - * find last manual process instance interval - * - * @param definitionCode process definition code - * @param dateInterval dateInterval - * @return process instance - */ - @Override - public ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, - int testFlag) { - return mybatisMapper.queryLastManualProcess(definitionCode, - dateInterval.getStartTime(), - dateInterval.getEndTime(), - testFlag); - } - /** * query first schedule process instance * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index 9cbb92286c4c..a36fbd067f19 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -174,16 +174,15 @@ public List queryByWorkflowInstanceId(Integer workflowInstanceId) } @Override - public List queryLastTaskInstanceListIntervalByTaskCodes(Set taskCodes, - DateInterval dateInterval, int testFlag) { - return mybatisMapper.findLastTaskInstances(taskCodes, dateInterval.getStartTime(), dateInterval.getEndTime(), - testFlag); + public List queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId, + Set taskCodes, + int testFlag) { + return mybatisMapper.findLastTaskInstances(processInstanceId, taskCodes, testFlag); } @Override - public TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval, - int testFlag) { - return mybatisMapper.findLastTaskInstance(depTaskCode, dateInterval.getStartTime(), dateInterval.getEndTime(), - testFlag); + public TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId, long depTaskCode, + int testFlag) { + return mybatisMapper.findLastTaskInstance(processInstanceId, depTaskCode, testFlag); } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index cd9c36ed1257..d085fe09b4a6 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -350,27 +350,25 @@ select task_code, max(end_time) as max_end_time from t_ds_task_instance where 1=1 and test_flag = #{testFlag} + and instance.process_instance_id = #{processInstanceId} and task_code in #{i} - - and start_time = ]]> #{startTime} and start_time #{endTime} - group by task_code ) t_max - on instance.task_code = t_max.task_code and instance.end_time = t_max.max_end_time + on instance.process_instance_id = t_max.process_instance_id + and instance.task_code = t_max.task_code + and instance.end_time = t_max.max_end_time diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 5840d998a64a..0d8a125fee71 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -930,7 +930,8 @@ CREATE TABLE `t_ds_task_instance` ( PRIMARY KEY (`id`), KEY `process_instance_id` (`process_instance_id`) USING BTREE, KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE, - KEY `idx_cache_key` (`cache_key`) USING BTREE + KEY `idx_cache_key` (`cache_key`) USING BTREE, + KEY `idx_pid_code_end_time` (`process_instance_id`, `task_code`, `end_time`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin; -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 3e5a8946bd11..273049f1155e 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -835,6 +835,7 @@ CREATE TABLE t_ds_task_instance ( create index idx_task_instance_code_version on t_ds_task_instance (task_code, task_definition_version); create index idx_cache_key on t_ds_task_instance (cache_key); +create index idx_task_instance_pid_code_end_time on t_ds_task_instance (process_instance_id, task_code, end_time); -- -- Table structure for table t_ds_tenant diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql index a7787e39e2aa..1c07d06018f2 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql @@ -101,3 +101,7 @@ d// delimiter ; CALL drop_t_ds_k8s_namespace_col_pod_request_memory; DROP PROCEDURE drop_t_ds_k8s_namespace_col_pod_request_memory; + + +CREATE INDEX IF NOT EXISTS idx_pid_code_end_time ON t_ds_task_instance (process_instance_id, task_code, end_time); +ANALYZE TABLE t_ds_task_instance; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql index 1c0af6a687b8..222b8a96cdd7 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql @@ -21,3 +21,6 @@ ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "limits_memory"; ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "pod_replicas"; ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "pod_request_cpu"; ALTER TABLE "t_ds_k8s_namespace" DROP COLUMN IF EXISTS "pod_request_memory"; + +create index if not exists idx_task_instance_pid_code_end_time on t_ds_task_instance (process_instance_id, task_code, end_time); +analyze t_ds_task_instance; \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java index 195212ef34ac..5c1fd0a0b6f9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java @@ -158,10 +158,9 @@ private DependResult calculateResultForTasks(DependentItem dependentItem, if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_WORKFLOW_CODE) { result = dependResultByProcessInstance(processInstance); } else if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) { - result = dependResultByAllTaskOfProcessInstance(processInstance, dateInterval, testFlag); + result = dependResultByAllTaskOfProcessInstance(processInstance, testFlag); } else { - result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), dateInterval, - testFlag); + result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), testFlag); } if (result != DependResult.SUCCESS) { break; @@ -194,8 +193,7 @@ private DependResult dependResultByProcessInstance(ProcessInstance processInstan * * @return */ - private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, - DateInterval dateInterval, int testFlag) { + private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, int testFlag) { if (!processInstance.getState().isFinished()) { log.info("Wait for the dependent workflow to complete, processCode: {}, processInstanceId: {}.", processInstance.getProcessDefinitionCode(), processInstance.getId()); @@ -212,8 +210,8 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc .collect(Collectors.toMap(TaskDefinitionLog::getCode, TaskDefinitionLog::getName)); List taskInstanceList = - taskInstanceDao.queryLastTaskInstanceListIntervalByTaskCodes(taskDefinitionCodeMap.keySet(), - dateInterval, testFlag); + taskInstanceDao.queryLastTaskInstanceListIntervalInProcessInstance(processInstance.getId(), + taskDefinitionCodeMap.keySet(), testFlag); Map taskExecutionStatusMap = taskInstanceList.stream() .filter(taskInstance -> taskInstance.getTaskExecuteType() != TaskExecuteType.STREAM) @@ -245,14 +243,14 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc * * @param processInstance last process instance in the date interval * @param depTaskCode the dependent task code - * @param dateInterval date interval * @param testFlag test flag * @return depend result */ private DependResult dependResultBySingleTaskInstance(ProcessInstance processInstance, long depTaskCode, - DateInterval dateInterval, int testFlag) { + int testFlag) { TaskInstance taskInstance = - taskInstanceDao.queryLastTaskInstanceIntervalByTaskCode(depTaskCode, dateInterval, testFlag); + taskInstanceDao.queryLastTaskInstanceIntervalInProcessInstance(processInstance.getId(), + depTaskCode, testFlag); if (taskInstance == null) { TaskDefinition taskDefinition = taskDefinitionDao.queryByCode(depTaskCode); @@ -318,21 +316,12 @@ private void addItemVarPool(String varPoolStr, Long endTime) { */ private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { - ProcessInstance lastSchedulerProcess = - processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag); + // Task instance cannot run without process instance, + // we should only use `scheduleTime` to search for process instances, + // as `dateInterval` is calculated based on the `scheduleTime` of the + // process instance where the `dependent` node is located. - ProcessInstance lastManualProcess = - processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval, testFlag); - - if (lastManualProcess == null) { - return lastSchedulerProcess; - } - if (lastSchedulerProcess == null) { - return lastManualProcess; - } - - // In the time range, there are both manual and scheduled workflow instances, return the last workflow instance - return lastManualProcess.getId() > lastSchedulerProcess.getId() ? lastManualProcess : lastSchedulerProcess; + return processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag); } /**