Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
reele committed Nov 12, 2024
1 parent 4dc5a00 commit 8e95c9c
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.master.transportor.*;

@RpcService
public interface ILogicTaskInstanceOperator {
Expand All @@ -34,6 +33,6 @@ public interface ILogicTaskInstanceOperator {
LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest);

@RpcMethod
LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest);
LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.dolphinscheduler.extract.master.transportor;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogicTaskTakeoverRequest implements Serializable {
public class LogicTaskTakeOverRequest implements Serializable {

private static final long serialVersionUID = -1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogicTaskTakeoverResponse {
public class LogicTaskTakeOverResponse {

private Integer taskInstanceId;

private boolean success;

private String message;

public static LogicTaskTakeoverResponse success(Integer taskInstanceId) {
return new LogicTaskTakeoverResponse(taskInstanceId, true, "takeover success");
public static LogicTaskTakeOverResponse success(Integer taskInstanceId) {
return new LogicTaskTakeOverResponse(taskInstanceId, true, "take over success");
}

public static LogicTaskTakeoverResponse failed(Integer taskInstanceId, String message) {
return new LogicTaskTakeoverResponse(taskInstanceId, false, message);
public static LogicTaskTakeOverResponse failed(Integer taskInstanceId, String message) {
return new LogicTaskTakeOverResponse(taskInstanceId, false, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ protected void assembleWorkflowInstance(
throw new IllegalArgumentException(
"The WorkflowFailoverCommandParam: " + command.getCommandParam() + " is invalid");
}
workflowInstance.setCommandType(command.getCommandType());
workflowInstance.addHistoryCmd(command.getCommandType());
workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus());
workflowInstanceDao.updateById(workflowInstance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceRelationMapper;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse;
Expand Down Expand Up @@ -161,15 +160,17 @@ private void initializeTaskExecutionContext() {

private WorkflowInstance getValidSubWorkflowInstance() {

WorkflowInstanceRelation workflowInstanceRelation = applicationContext.getBean(WorkflowInstanceRelationMapper.class)
.queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId());
WorkflowInstanceRelation workflowInstanceRelation =
applicationContext.getBean(WorkflowInstanceRelationMapper.class)
.queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId());
if (workflowInstanceRelation == null || workflowInstanceRelation.getWorkflowInstanceId() == 0) {
return null;
}

WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class).queryDetailById(workflowInstanceRelation.getWorkflowInstanceId());
WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class)
.queryDetailById(workflowInstanceRelation.getWorkflowInstanceId());

if (workflowInstance == null || !workflowInstance.getState().canFailover()){
if (workflowInstance == null || !workflowInstance.getState().canFailover()) {
return null;
}

Expand All @@ -193,20 +194,19 @@ private boolean takeOverTaskFromExecutor() {
return false;
}
try {
final LogicTaskTakeoverRequest takeOverTaskRequest =
new LogicTaskTakeoverRequest(taskExecutionContext);
final LogicTaskTakeOverRequest takeOverTaskRequest =
new LogicTaskTakeOverRequest(taskExecutionContext);

final LogicTaskTakeoverResponse takeOverTaskResponse = Clients
final LogicTaskTakeOverResponse takeOverTaskResponse = Clients
.withService(ILogicTaskInstanceOperator.class)
.withHost(taskInstance.getHost())
.takeoverLogicTask(takeOverTaskRequest);
.takeOverLogicTask(takeOverTaskRequest);
return takeOverTaskResponse.isSuccess();
} catch (Exception ex) {
log.warn("Take over logic task: {} failed", taskInstance.getName(), ex);
return false;
}
}
else {
} else {
try {
final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder()
.taskInstanceId(taskInstance.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@

package org.apache.dolphinscheduler.server.master.rpc;

import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class LogicITaskInstanceTakeoverOperationFunction
public class LogicITaskInstanceTakeOverOperationFunction
implements
ITaskInstanceOperationFunction<LogicTaskTakeoverRequest, LogicTaskTakeoverResponse> {
ITaskInstanceOperationFunction<LogicTaskTakeOverRequest, LogicTaskTakeOverResponse> {

@Autowired
private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;
Expand All @@ -47,7 +48,7 @@ public class LogicITaskInstanceTakeoverOperationFunction
private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager;

@Override
public LogicTaskTakeoverResponse operate(LogicTaskTakeoverRequest taskTakeoverRequest) {
public LogicTaskTakeOverResponse operate(LogicTaskTakeOverRequest taskTakeoverRequest) {
log.info("Received dispatchLogicTask request: {}", taskTakeoverRequest);
TaskExecutionContext taskExecutionContext = taskTakeoverRequest.getTaskExecutionContext();
try {
Expand All @@ -66,12 +67,12 @@ public LogicTaskTakeoverResponse operate(LogicTaskTakeoverRequest taskTakeoverRe
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
.createMasterTaskExecutor(taskExecutionContext);

if (masterTaskExecutorThreadPool.takeoverMasterTaskExecutor(masterTaskExecutor)) {
log.info("Takeover LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName);
return LogicTaskTakeoverResponse.success(taskInstanceId);
if (masterTaskExecutorThreadPool.takeOverMasterTaskExecutor(masterTaskExecutor)) {
log.info("Take over LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName);
return LogicTaskTakeOverResponse.success(taskInstanceId);
} else {
log.error("Takeover LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName);
return LogicTaskTakeoverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full");
log.error("Take over LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName);
return LogicTaskTakeOverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full");
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class LogicTaskInstanceOperationFunctionManager {
private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction;

@Autowired
private LogicITaskInstanceTakeoverOperationFunction logicITaskInstanceTakeoverOperationFunction;
private LogicITaskInstanceTakeOverOperationFunction logicITaskInstanceTakeOverOperationFunction;

public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() {
return logicITaskInstanceDispatchOperationFunction;
Expand All @@ -47,8 +47,8 @@ public LogicITaskInstancePauseOperationFunction getLogicTaskInstancePauseOperati
return logicITaskInstancePauseOperationFunction;
}

public LogicITaskInstanceTakeoverOperationFunction getLogicTaskInstanceTakeoverOperationFunction() {
return logicITaskInstanceTakeoverOperationFunction;
public LogicITaskInstanceTakeOverOperationFunction getLogicTaskInstanceTakeOverOperationFunction() {
return logicITaskInstanceTakeOverOperationFunction;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.rpc;

import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.transportor.*;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -51,9 +50,9 @@ public LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequ
}

@Override
public LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest) {
return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeoverOperationFunction()
.operate(taskTakeoverRequest);
public LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest) {
return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeOverOperationFunction()
.operate(taskTakeOverRequest);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.execute;

import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager;
import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -51,13 +52,16 @@ public boolean submitMasterTaskExecutor(final MasterTaskExecutor masterTaskExecu
throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor);
}


public boolean takeoverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) {
public boolean takeOverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) {
if (!(masterTaskExecutor.getLogicTask() instanceof SubWorkflowLogicTask)) {
throw new IllegalArgumentException(
"Only SubWorkflowLogicTask can be take over: " + masterTaskExecutor.getLogicTask());
}
MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor);
// if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
// return masterSyncTaskExecutorThreadPool
// .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor);
// }
if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
return masterSyncTaskExecutorThreadPool
.submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor);
}
if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) {
return masterAsyncTaskExecutorThreadPool
.submitMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.runner.task.subworkflow;

import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand Down Expand Up @@ -71,14 +72,21 @@ public SubWorkflowLogicTask(final TaskExecutionContext taskExecutionContext,

@Override
public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() {
subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance();
upsertSubWorkflowRelation();
taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext));

applicationContext
.getBean(LogicTaskInstanceExecutionEventSenderManager.class)
.runningEventSender()
.sendMessage(taskExecutionContext);
// if the sub-workflow instance is already exists and in fail over process,
// there should take over the task without create a new sub-workflow instance
if (subWorkflowLogicTaskRuntimeContext == null
|| workflowExecutionRunnable.getWorkflowInstance()
.getCommandType() != CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {

subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance();
upsertSubWorkflowRelation();
taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext));

applicationContext
.getBean(LogicTaskInstanceExecutionEventSenderManager.class)
.runningEventSender()
.sendMessage(taskExecutionContext);
}

return new SubWorkflowAsyncTaskExecuteFunction(
subWorkflowLogicTaskRuntimeContext,
Expand Down

0 comments on commit 8e95c9c

Please sign in to comment.