From 8e95c9c53fd3bdafbfff5a1eca2aef57e69701ac Mon Sep 17 00:00:00 2001 From: relee Date: Wed, 13 Nov 2024 00:34:45 +0800 Subject: [PATCH] Fix #16767 --- .../master/ILogicTaskInstanceOperator.java | 3 +-- ...est.java => LogicTaskTakeOverRequest.java} | 9 ++++--- ...se.java => LogicTaskTakeOverResponse.java} | 10 +++---- .../WorkflowFailoverCommandHandler.java | 2 ++ .../task/runnable/TaskExecutionRunnable.java | 26 +++++++++---------- ...askInstanceTakeOverOperationFunction.java} | 25 +++++++++--------- ...cTaskInstanceOperationFunctionManager.java | 6 ++--- .../rpc/LogicTaskInstanceOperatorImpl.java | 7 +++-- .../MasterTaskExecutorThreadPoolManager.java | 16 +++++++----- .../subworkflow/SubWorkflowLogicTask.java | 24 +++++++++++------ 10 files changed, 71 insertions(+), 57 deletions(-) rename dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/{LogicTaskTakeoverRequest.java => LogicTaskTakeOverRequest.java} (94%) rename dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/{LogicTaskTakeoverResponse.java => LogicTaskTakeOverResponse.java} (78%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/{LogicITaskInstanceTakeoverOperationFunction.java => LogicITaskInstanceTakeOverOperationFunction.java} (82%) diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java index 448f49758312..0d3f6300e102 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java @@ -19,7 +19,6 @@ import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; -import org.apache.dolphinscheduler.extract.master.transportor.*; @RpcService public interface ILogicTaskInstanceOperator { @@ -34,6 +33,6 @@ public interface ILogicTaskInstanceOperator { LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest); @RpcMethod - LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest); + LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java similarity index 94% rename from dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java rename to dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java index 102641b00eec..3381c48f7f4d 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java @@ -17,17 +17,18 @@ package org.apache.dolphinscheduler.extract.master.transportor; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import java.io.Serializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + @Data @NoArgsConstructor @AllArgsConstructor -public class LogicTaskTakeoverRequest implements Serializable { +public class LogicTaskTakeOverRequest implements Serializable { private static final long serialVersionUID = -1L; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java similarity index 78% rename from dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java rename to dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java index 62e6a052b2f8..e800fb09e5b3 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java @@ -24,7 +24,7 @@ @Data @NoArgsConstructor @AllArgsConstructor -public class LogicTaskTakeoverResponse { +public class LogicTaskTakeOverResponse { private Integer taskInstanceId; @@ -32,11 +32,11 @@ public class LogicTaskTakeoverResponse { private String message; - public static LogicTaskTakeoverResponse success(Integer taskInstanceId) { - return new LogicTaskTakeoverResponse(taskInstanceId, true, "takeover success"); + public static LogicTaskTakeOverResponse success(Integer taskInstanceId) { + return new LogicTaskTakeOverResponse(taskInstanceId, true, "take over success"); } - public static LogicTaskTakeoverResponse failed(Integer taskInstanceId, String message) { - return new LogicTaskTakeoverResponse(taskInstanceId, false, message); + public static LogicTaskTakeOverResponse failed(Integer taskInstanceId, String message) { + return new LogicTaskTakeOverResponse(taskInstanceId, false, message); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java index 506fcdffb72a..a5999e293659 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java @@ -93,6 +93,8 @@ protected void assembleWorkflowInstance( throw new IllegalArgumentException( "The WorkflowFailoverCommandParam: " + command.getCommandParam() + " is invalid"); } + workflowInstance.setCommandType(command.getCommandType()); + workflowInstance.addHistoryCmd(command.getCommandType()); workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus()); workflowInstanceDao.updateById(workflowInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java index f0422d5ccce8..345f33efcda7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java @@ -20,13 +20,12 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceRelationMapper; import org.apache.dolphinscheduler.extract.base.client.Clients; import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse; @@ -161,15 +160,17 @@ private void initializeTaskExecutionContext() { private WorkflowInstance getValidSubWorkflowInstance() { - WorkflowInstanceRelation workflowInstanceRelation = applicationContext.getBean(WorkflowInstanceRelationMapper.class) - .queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId()); + WorkflowInstanceRelation workflowInstanceRelation = + applicationContext.getBean(WorkflowInstanceRelationMapper.class) + .queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId()); if (workflowInstanceRelation == null || workflowInstanceRelation.getWorkflowInstanceId() == 0) { return null; } - WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class).queryDetailById(workflowInstanceRelation.getWorkflowInstanceId()); + WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class) + .queryDetailById(workflowInstanceRelation.getWorkflowInstanceId()); - if (workflowInstance == null || !workflowInstance.getState().canFailover()){ + if (workflowInstance == null || !workflowInstance.getState().canFailover()) { return null; } @@ -193,20 +194,19 @@ private boolean takeOverTaskFromExecutor() { return false; } try { - final LogicTaskTakeoverRequest takeOverTaskRequest = - new LogicTaskTakeoverRequest(taskExecutionContext); + final LogicTaskTakeOverRequest takeOverTaskRequest = + new LogicTaskTakeOverRequest(taskExecutionContext); - final LogicTaskTakeoverResponse takeOverTaskResponse = Clients + final LogicTaskTakeOverResponse takeOverTaskResponse = Clients .withService(ILogicTaskInstanceOperator.class) .withHost(taskInstance.getHost()) - .takeoverLogicTask(takeOverTaskRequest); + .takeOverLogicTask(takeOverTaskRequest); return takeOverTaskResponse.isSuccess(); } catch (Exception ex) { log.warn("Take over logic task: {} failed", taskInstance.getName(), ex); return false; } - } - else { + } else { try { final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() .taskInstanceId(taskInstance.getId()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java similarity index 82% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java index 62765e859795..35432d54ab24 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java @@ -17,25 +17,26 @@ package org.apache.dolphinscheduler.server.master.rpc; -import lombok.extern.slf4j.Slf4j; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager; import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; + +import lombok.extern.slf4j.Slf4j; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component -public class LogicITaskInstanceTakeoverOperationFunction +public class LogicITaskInstanceTakeOverOperationFunction implements - ITaskInstanceOperationFunction { + ITaskInstanceOperationFunction { @Autowired private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder; @@ -47,7 +48,7 @@ public class LogicITaskInstanceTakeoverOperationFunction private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager; @Override - public LogicTaskTakeoverResponse operate(LogicTaskTakeoverRequest taskTakeoverRequest) { + public LogicTaskTakeOverResponse operate(LogicTaskTakeOverRequest taskTakeoverRequest) { log.info("Received dispatchLogicTask request: {}", taskTakeoverRequest); TaskExecutionContext taskExecutionContext = taskTakeoverRequest.getTaskExecutionContext(); try { @@ -66,12 +67,12 @@ public LogicTaskTakeoverResponse operate(LogicTaskTakeoverRequest taskTakeoverRe .createMasterTaskExecutorFactory(taskExecutionContext.getTaskType()) .createMasterTaskExecutor(taskExecutionContext); - if (masterTaskExecutorThreadPool.takeoverMasterTaskExecutor(masterTaskExecutor)) { - log.info("Takeover LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName); - return LogicTaskTakeoverResponse.success(taskInstanceId); + if (masterTaskExecutorThreadPool.takeOverMasterTaskExecutor(masterTaskExecutor)) { + log.info("Take over LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName); + return LogicTaskTakeOverResponse.success(taskInstanceId); } else { - log.error("Takeover LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName); - return LogicTaskTakeoverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full"); + log.error("Take over LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName); + return LogicTaskTakeOverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full"); } } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java index 89ed27953a82..fe5e8a1704dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java @@ -33,7 +33,7 @@ public class LogicTaskInstanceOperationFunctionManager { private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction; @Autowired - private LogicITaskInstanceTakeoverOperationFunction logicITaskInstanceTakeoverOperationFunction; + private LogicITaskInstanceTakeOverOperationFunction logicITaskInstanceTakeOverOperationFunction; public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() { return logicITaskInstanceDispatchOperationFunction; @@ -47,8 +47,8 @@ public LogicITaskInstancePauseOperationFunction getLogicTaskInstancePauseOperati return logicITaskInstancePauseOperationFunction; } - public LogicITaskInstanceTakeoverOperationFunction getLogicTaskInstanceTakeoverOperationFunction() { - return logicITaskInstanceTakeoverOperationFunction; + public LogicITaskInstanceTakeOverOperationFunction getLogicTaskInstanceTakeOverOperationFunction() { + return logicITaskInstanceTakeOverOperationFunction; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java index 61917cba8b9a..faa65d6223ce 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.rpc; import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; -import org.apache.dolphinscheduler.extract.master.transportor.*; import lombok.extern.slf4j.Slf4j; @@ -51,9 +50,9 @@ public LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequ } @Override - public LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest) { - return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeoverOperationFunction() - .operate(taskTakeoverRequest); + public LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest) { + return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeOverOperationFunction() + .operate(taskTakeOverRequest); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java index e4b10cc4a310..5c7f04d319a4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; import lombok.extern.slf4j.Slf4j; @@ -51,13 +52,16 @@ public boolean submitMasterTaskExecutor(final MasterTaskExecutor masterTaskExecu throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); } - - public boolean takeoverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { + public boolean takeOverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { + if (!(masterTaskExecutor.getLogicTask() instanceof SubWorkflowLogicTask)) { + throw new IllegalArgumentException( + "Only SubWorkflowLogicTask can be take over: " + masterTaskExecutor.getLogicTask()); + } MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor); -// if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { -// return masterSyncTaskExecutorThreadPool -// .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); -// } + if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { + return masterSyncTaskExecutorThreadPool + .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); + } if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) { return masterAsyncTaskExecutorThreadPool .submitMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java index 2a9595f43bf2..6e923ccbcc2c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.task.subworkflow; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; @@ -71,14 +72,21 @@ public SubWorkflowLogicTask(final TaskExecutionContext taskExecutionContext, @Override public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() { - subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance(); - upsertSubWorkflowRelation(); - taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext)); - - applicationContext - .getBean(LogicTaskInstanceExecutionEventSenderManager.class) - .runningEventSender() - .sendMessage(taskExecutionContext); + // if the sub-workflow instance is already exists and in fail over process, + // there should take over the task without create a new sub-workflow instance + if (subWorkflowLogicTaskRuntimeContext == null + || workflowExecutionRunnable.getWorkflowInstance() + .getCommandType() != CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) { + + subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance(); + upsertSubWorkflowRelation(); + taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext)); + + applicationContext + .getBean(LogicTaskInstanceExecutionEventSenderManager.class) + .runningEventSender() + .sendMessage(taskExecutionContext); + } return new SubWorkflowAsyncTaskExecuteFunction( subWorkflowLogicTaskRuntimeContext,