Skip to content

Commit

Permalink
Merge pull request #351 from WeDataSphere/dev-1.1.0-perform-optimize
Browse files Browse the repository at this point in the history
Dev 1.1.0 perform optimize
  • Loading branch information
Davidhua1996 authored Sep 2, 2022
2 parents 51da355 + 19a4165 commit ab998cf
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ public void onDelete(TaskDeleteEvent deleteEvent) {
// Ignore
}

@Override
public void onDequeue(TaskDequeueEvent dequeueEvent) throws ExchangisOnEventException {
//Ignore
}

@Override
public void onProgressUpdate(TaskProgressUpdateEvent updateEvent) {
// Ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ default void onEvent(TaskExecutionEvent event) throws ExchangisOnEventException{
onDelete((TaskDeleteEvent)event);
} else if (event instanceof TaskProgressUpdateEvent){
onProgressUpdate((TaskProgressUpdateEvent)event);
} else if (event instanceof TaskDequeueEvent){
onDequeue((TaskDequeueEvent) event);
}
}

Expand Down Expand Up @@ -52,9 +54,17 @@ default void onEvent(TaskExecutionEvent event) throws ExchangisOnEventException{
*/
void onDelete(TaskDeleteEvent deleteEvent) throws ExchangisOnEventException;

/**
* Dequeue event
* @param dequeueEvent dequeue event
* @throws ExchangisOnEventException exception
*/
void onDequeue(TaskDequeueEvent dequeueEvent) throws ExchangisOnEventException;

/**
* Progress update
* @param updateEvent update event
*/
void onProgressUpdate(TaskProgressUpdateEvent updateEvent) throws ExchangisOnEventException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.webank.wedatasphere.exchangis.job.server.execution.events;


/**
* Event that remove the launchable task from the queue(table)
*/
public class TaskDequeueEvent extends TaskExecutionEvent{
/**
* Task id
*/
private String taskId;
/**
* @param taskId task id
*/
public TaskDequeueEvent(String taskId) {
super(null);
this.taskId = taskId;
}

public String getTaskId() {
return taskId;
}

public void setTaskId(String taskId) {
this.taskId = taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.scheduler.Scheduler;
import org.apache.linkis.scheduler.SchedulerContext;
import org.apache.linkis.scheduler.queue.ConsumerManager;
import org.apache.linkis.scheduler.queue.GroupFactory;
import org.apache.linkis.scheduler.queue.SchedulerEventState;
Expand Down Expand Up @@ -55,7 +56,6 @@ public FlexibleTenancyLoadBalancer(Scheduler scheduler, TaskManager<LaunchedExch
@Override
protected LoadBalanceSchedulerTask<LaunchedExchangisTask> choose(LaunchedExchangisTask launchedExchangisTask, Class<?> schedulerTaskClass, boolean unchecked) {
if( !unchecked || isSuitableClass(schedulerTaskClass)){
String schedulerTaskName = schedulerTaskClass.getSimpleName();
// Fetch the latest info
launchedExchangisTask = getTaskManager().getRunningTask(launchedExchangisTask.getTaskId());
// If the value is None means that the task is ended
Expand All @@ -71,29 +71,8 @@ protected LoadBalanceSchedulerTask<LaunchedExchangisTask> choose(LaunchedExchang
if (StringUtils.isBlank(tenancy)) {
tenancy = TenancyParallelGroupFactory.DEFAULT_TENANCY;
}
String finalTenancy = tenancy;
SchedulerTaskContainer schedulerTaskContainer =tenancySchedulerTasks.compute(tenancy + "_" + schedulerTaskName,(key, taskContainer) -> {
if (Objects.isNull(taskContainer)){
LoadBalanceSchedulerTask<LaunchedExchangisTask> headSchedulerTask = createLoadBalanceSchedulerTask(schedulerTaskClass);
if (headSchedulerTask instanceof AbstractLoadBalanceSchedulerTask){
((AbstractLoadBalanceSchedulerTask<LaunchedExchangisTask>) headSchedulerTask)
.setSchedulerLoadBalancer(FlexibleTenancyLoadBalancer.this);
}
headSchedulerTask.setTenancy(finalTenancy);
try {
getScheduler().submit(headSchedulerTask);
} catch (Exception e){
// Only if not enough reserved threads in scheduler
throw new ExchangisTaskExecuteException.Runtime("If there is no enough reserved threads in scheduler for tenancy: [" + finalTenancy
+ "], load balance scheduler task: [" + schedulerTaskName + "]? please invoke setInitResidentThreads(num) method in consumerManager", e);
}
taskContainer = new SchedulerTaskContainer(headSchedulerTask);
taskContainer.tenancy = finalTenancy;
}
return taskContainer;
});
// Select one
return schedulerTaskContainer.select();
return geOrCreateSchedulerTaskContainer(tenancy, schedulerTaskClass).select();
}

}
Expand Down Expand Up @@ -137,6 +116,7 @@ private LoadBalanceSchedulerTask<LaunchedExchangisTask> createLoadBalanceSchedul
public void run() {
Thread.currentThread().setName("Balancer-Thread" + getName());
LOG.info("Thread:[ {} ] is started. ", Thread.currentThread().getName());
initLoadBalancerSchedulerTasks();
ConsumerManager consumerManager = getScheduler().getSchedulerContext().getOrCreateConsumerManager();
Map<String, ExecutorService> tenancyExecutorServices = new HashMap<>();
int residentThreads = 0;
Expand Down Expand Up @@ -273,6 +253,53 @@ public String getName() {
return this.getClass().getSimpleName();
}

/**
* Get or create scheduler task container
* @return container
*/
private SchedulerTaskContainer geOrCreateSchedulerTaskContainer(String tenancy, Class<?> schedulerTaskClass){
String schedulerTaskName = schedulerTaskClass.getSimpleName();
return tenancySchedulerTasks.compute(tenancy + "_" + schedulerTaskName,(key, taskContainer) -> {
if (Objects.isNull(taskContainer)){
LoadBalanceSchedulerTask<LaunchedExchangisTask> headSchedulerTask = createLoadBalanceSchedulerTask(schedulerTaskClass);
if (headSchedulerTask instanceof AbstractLoadBalanceSchedulerTask){
((AbstractLoadBalanceSchedulerTask<LaunchedExchangisTask>) headSchedulerTask)
.setSchedulerLoadBalancer(FlexibleTenancyLoadBalancer.this);
}
headSchedulerTask.setTenancy(tenancy);
try {
getScheduler().submit(headSchedulerTask);
} catch (Exception e){
// Only if not enough reserved threads in scheduler
throw new ExchangisTaskExecuteException.Runtime("If there is no enough reserved threads in scheduler for tenancy: [" + tenancy
+ "], load balance scheduler task: [" + schedulerTaskName + "]? please invoke setInitResidentThreads(num) method in consumerManager", e);
}
taskContainer = new SchedulerTaskContainer(headSchedulerTask);
taskContainer.tenancy = tenancy;
LOG.info("Create scheduler task container[ tenancy: {}, load balance scheduler task: {} ]", tenancy, schedulerTaskName);
}
return taskContainer;
});
}

/**
* Init to pre create task container for load balancer scheduler tasks
*/
private void initLoadBalancerSchedulerTasks(){
SchedulerContext schedulerContext = getScheduler().getSchedulerContext();
if (schedulerContext instanceof ExchangisSchedulerContext){
Optional.ofNullable(((ExchangisSchedulerContext)schedulerContext).getTenancies()).ifPresent(tenancies -> {
tenancies.forEach(tenancy -> {
// Skip the system tenancy
if (!tenancy.startsWith(".")) {
for (Class<?> registeredTaskClass : registeredTaskClasses) {
geOrCreateSchedulerTaskContainer(tenancy, registeredTaskClass);
}
}
});
});
}
}
static class LoopCounter {

AtomicInteger containers = new AtomicInteger(0);
Expand All @@ -283,6 +310,7 @@ static class LoopCounter {

List<SchedulerTaskContainer> taskContainers = new ArrayList<>();
}

/**
* Scheduler
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.webank.wedatasphere.exchangis.job.server.execution.scheduler;

import org.apache.commons.lang3.StringUtils;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.scheduler.AbstractScheduler;
import org.apache.linkis.scheduler.SchedulerContext;
Expand All @@ -8,6 +9,10 @@
import org.apache.linkis.scheduler.queue.GroupFactory;
import org.apache.linkis.scheduler.queue.fifoqueue.FIFOSchedulerContextImpl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Inherited the AbstractScheduler from linkis-scheduler
*/
Expand All @@ -17,7 +22,15 @@ private static class Constraints{

private static final CommonVars<Integer> MAX_PARALLEL_PER_TENANCY = CommonVars.apply("wds.exchangis.job.scheduler.consumer.max.parallel.per-tenancy", 1);

private static final CommonVars<String> TENANCY_PATTERN = CommonVars.apply("wds.exchangis.job.scheduler.consumer.tenancies", "hadoop,log");
/**
* System tenancies
*/
private static final CommonVars<String> SYSTEM_TENANCY_PATTERN = CommonVars.apply("wds.exchangis.job.scheduler.consumer.tenancies-system", ".log");

/**
* Custom tenancies
*/
private static final CommonVars<String> CUSTOM_TENANCY_PATTERN = CommonVars.apply("wds.exchangis.job.scheduler.consumer.tenancies", "hadoop");

private static final CommonVars<Integer> GROUP_INIT_CAPACITY = CommonVars.apply("wds.exchangis.job.scheduler.group.min.capacity", 1000);

Expand All @@ -40,7 +53,16 @@ public ExchangisGenericScheduler(ExecutorManager executorManager, ConsumerManage

@Override
public void init() {
this.schedulerContext = new ExchangisSchedulerContext(Constraints.MAX_PARALLEL_PER_TENANCY.getValue(), Constraints.TENANCY_PATTERN.getValue());
List<String> tenancies = new ArrayList<>();
String sysTenancies = Constraints.SYSTEM_TENANCY_PATTERN.getValue();
if (StringUtils.isNotBlank(sysTenancies)){
tenancies.addAll(Arrays.asList(sysTenancies.split(",")));
}
String customTenancies = Constraints.CUSTOM_TENANCY_PATTERN.getValue();
if (StringUtils.isNotBlank(customTenancies)){
tenancies.addAll(Arrays.asList(customTenancies.split(",")));
}
this.schedulerContext = new ExchangisSchedulerContext(Constraints.MAX_PARALLEL_PER_TENANCY.getValue(), tenancies);
GroupFactory groupFactory = this.schedulerContext.getOrCreateGroupFactory();
if (groupFactory instanceof TenancyParallelGroupFactory){
TenancyParallelGroupFactory tenancyParallelGroupFactory = (TenancyParallelGroupFactory)groupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* Contains the executorManager, consumerManager and groupFactory
Expand All @@ -17,22 +18,20 @@ public class ExchangisSchedulerContext extends FIFOSchedulerContextImpl {
/**
* Tenancy list
*/
private String tenancies;
private final List<String> tenancies;

private int maxParallelismPerUser = 1;
public ExchangisSchedulerContext(int maxParallelismPerUser, String tenancies) {
public ExchangisSchedulerContext(int maxParallelismPerUser, List<String> tenancies) {
super(Integer.MAX_VALUE);
this.maxParallelismPerUser = maxParallelismPerUser;
if (StringUtils.isNotBlank(tenancies)){
this.tenancies = tenancies;
}
this.tenancies = tenancies;
}

@Override
public GroupFactory createGroupFactory() {
TenancyParallelGroupFactory parallelGroupFactory = new TenancyParallelGroupFactory();
parallelGroupFactory.setParallelPerTenancy(maxParallelismPerUser);
parallelGroupFactory.setTenancies(StringUtils.isNotBlank(tenancies)? Arrays.asList(tenancies.split(",")) : Collections.emptyList());
parallelGroupFactory.setTenancies(this.tenancies);
return parallelGroupFactory;
}

Expand All @@ -41,4 +40,7 @@ public ConsumerManager createConsumerManager() {
throw new ExchangisSchedulerException.Runtime("Must set the consumer manager before scheduling", null);
}

public List<String> getTenancies() {
return tenancies;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.webank.wedatasphere.exchangis.job.server.exception.ExchangisSchedulerRetryException;
import com.webank.wedatasphere.exchangis.job.server.execution.AbstractTaskManager;
import com.webank.wedatasphere.exchangis.job.server.execution.TaskManager;
import com.webank.wedatasphere.exchangis.job.server.execution.events.TaskDequeueEvent;
import com.webank.wedatasphere.exchangis.job.server.execution.events.TaskExecutionEvent;
import com.webank.wedatasphere.exchangis.job.server.execution.events.TaskDeleteEvent;
import com.webank.wedatasphere.exchangis.job.server.execution.events.TaskStatusUpdateEvent;
Expand Down Expand Up @@ -96,6 +97,8 @@ protected void schedule() throws ExchangisSchedulerException, ExchangisScheduler
info(jobExecutionId, "Launch task:[name:{} ,id:{}] fail, possible reason is: [{}]",
launchableExchangisTask.getName(), launchableExchangisTask.getId(), getActualCause(e).getMessage());
if (retryCnt.incrementAndGet() < getMaxRetryNum()) {
// Remove the launched task stored
onEvent(new TaskDeleteEvent(String.valueOf(launchableExchangisTask.getId())));
throw new ExchangisSchedulerRetryException("Error occurred in invoking launching method for task: [" + launchableExchangisTask.getId() +"]", e);
}else {
// Update the launched task status to fail
Expand Down Expand Up @@ -126,12 +129,20 @@ protected void schedule() throws ExchangisSchedulerException, ExchangisScheduler
}
});
}
if (successAdd && Objects.nonNull(this.loadBalancer)) {
// Add the launchedExchangisTask to the load balance poller
List<LoadBalanceSchedulerTask<LaunchedExchangisTask>> loadBalanceSchedulerTasks = this.loadBalancer.choose(launchedExchangisTask);
Optional.ofNullable(loadBalanceSchedulerTasks).ifPresent(tasks -> tasks.forEach(loadBalanceSchedulerTask -> {
loadBalanceSchedulerTask.getOrCreateLoadBalancePoller().push(launchedExchangisTask);
}));
if (successAdd){
try {
onEvent(new TaskDequeueEvent(launchableExchangisTask.getId() + ""));
}catch (Exception e){
// Ignore the exception
LOG.warn("Fail to dequeue the launchable task [{}]", launchableExchangisTask.getId(), e);
}
if (Objects.nonNull(this.loadBalancer)){
// Add the launchedExchangisTask to the load balance poller
List<LoadBalanceSchedulerTask<LaunchedExchangisTask>> loadBalanceSchedulerTasks = this.loadBalancer.choose(launchedExchangisTask);
Optional.ofNullable(loadBalanceSchedulerTasks).ifPresent(tasks -> tasks.forEach(loadBalanceSchedulerTask -> {
loadBalanceSchedulerTask.getOrCreateLoadBalancePoller().push(launchedExchangisTask);
}));
}
}
}
}
Expand Down Expand Up @@ -184,7 +195,6 @@ public String getName() {
return "Scheduler-SubmitTask-" + getId();
}


@Override
public JobInfo getJobInfo() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
</insert>

<delete id="deleteLaunchableTask">
delete * from
delete from
<include refid="TableName"/>
where id = #{taskId}
</delete>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.webank.wedatasphere.exchangis.job.launcher.domain.LaunchedExchangisTask;
import com.webank.wedatasphere.exchangis.job.launcher.domain.task.TaskStatus;
import com.webank.wedatasphere.exchangis.job.launcher.entity.LaunchedExchangisJobEntity;
import com.webank.wedatasphere.exchangis.job.server.mapper.LaunchableTaskDao;
import com.webank.wedatasphere.exchangis.job.server.mapper.LaunchedJobDao;
import com.webank.wedatasphere.exchangis.job.server.mapper.LaunchedTaskDao;
import com.webank.wedatasphere.exchangis.job.server.execution.events.*;
Expand Down Expand Up @@ -33,6 +34,9 @@ public class DefaultTaskExecuteService implements TaskExecuteService {
@Resource
private LaunchedJobDao launchedJobDao;

@Resource
private LaunchableTaskDao launchableTaskDao;

private TaskExecuteService selfService;

@Override
Expand Down Expand Up @@ -92,6 +96,12 @@ public void onDelete(TaskDeleteEvent deleteEvent) {
this.launchedTaskDao.deleteLaunchedTask(deleteEvent.getTaskId());
}

@Override
public void onDequeue(TaskDequeueEvent dequeueEvent) throws ExchangisOnEventException {
// Delete task in table
this.launchableTaskDao.deleteLaunchableTask(dequeueEvent.getTaskId());
}

@Override
public void onProgressUpdate(TaskProgressUpdateEvent updateEvent) throws ExchangisOnEventException {
LaunchedExchangisTask task = updateEvent.getLaunchedExchangisTask();
Expand Down
Loading

0 comments on commit ab998cf

Please sign in to comment.