Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pd-store): reset previous unexpected cherry-pick for TaskManager #2511

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public final class TaskManager {
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";

public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d";
public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d";

protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
private static final int THREADS = 4;
Expand All @@ -60,6 +65,11 @@ public final class TaskManager {
private final ExecutorService serverInfoDbExecutor;
private final PausableScheduledThreadPool schedulerExecutor;

private final ExecutorService schemaTaskExecutor;
private final ExecutorService olapTaskExecutor;
private final ExecutorService ephemeralTaskExecutor;
private final PausableScheduledThreadPool distributedSchedulerExecutor;

private boolean enableRoleElected = false;

public static TaskManager instance() {
Expand All @@ -76,6 +86,17 @@ private TaskManager(int pool) {
1, TASK_DB_WORKER);
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);

this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
SCHEMA_TASK_WORKER);
this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
OLAP_TASK_WORKER);
this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
EPHEMERAL_TASK_WORKER);
this.distributedSchedulerExecutor =
ExecutorUtil.newPausableScheduledThreadPool(1,
DISTRIBUTED_TASK_SCHEDULER);

// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
Expand All @@ -88,11 +109,36 @@ private TaskManager(int pool) {

public void addScheduler(HugeGraphParams graph) {
E.checkArgumentNotNull(graph, "The graph can't be null");

TaskScheduler scheduler = new StandardTaskScheduler(graph,
this.taskExecutor, this.taskDbExecutor,
this.serverInfoDbExecutor);
this.schedulers.put(graph, scheduler);
LOG.info("Use {} as the scheduler of graph ({})",
graph.schedulerType(), graph.name());
// TODO: 如当前服务绑定到指定的非 DEFAULT 图空间,非当前图空间的图不再创建任务调度器 (graph space)
switch (graph.schedulerType()) {
case "distributed": {
TaskScheduler scheduler =
new DistributedTaskScheduler(
graph,
distributedSchedulerExecutor,
taskDbExecutor,
schemaTaskExecutor,
olapTaskExecutor,
taskExecutor, /* gremlinTaskExecutor */
ephemeralTaskExecutor,
serverInfoDbExecutor);
this.schedulers.put(graph, scheduler);
break;
}
case "local":
default: {
TaskScheduler scheduler =
new StandardTaskScheduler(
graph,
this.taskExecutor,
this.taskDbExecutor,
this.serverInfoDbExecutor);
this.schedulers.put(graph, scheduler);
break;
}
}
}

public void closeScheduler(HugeGraphParams graph) {
Expand Down Expand Up @@ -123,6 +169,10 @@ public void closeScheduler(HugeGraphParams graph) {
if (!this.schedulerExecutor.isTerminated()) {
this.closeSchedulerTx(graph);
}

if (!this.distributedSchedulerExecutor.isTerminated()) {
this.closeDistributedSchedulerTx(graph);
}
}

private void closeTaskTx(HugeGraphParams graph) {
Expand Down Expand Up @@ -157,6 +207,21 @@ private void closeSchedulerTx(HugeGraphParams graph) {
}
}

private void closeDistributedSchedulerTx(HugeGraphParams graph) {
final Callable<Void> closeTx = () -> {
// Do close-tx for current thread
graph.closeTx();
// Let other threads run
Thread.yield();
return null;
};
try {
this.distributedSchedulerExecutor.submit(closeTx).get();
} catch (Exception e) {
throw new HugeException("Exception when closing scheduler tx", e);
}
}

public void pauseScheduledThreadPool() {
this.schedulerExecutor.pauseSchedule();
}
Expand All @@ -170,8 +235,7 @@ public TaskScheduler getScheduler(HugeGraphParams graph) {
}

public ServerInfoManager getServerInfoManager(HugeGraphParams graph) {
StandardTaskScheduler scheduler = (StandardTaskScheduler)
this.getScheduler(graph);
TaskScheduler scheduler = this.getScheduler(graph);
if (scheduler == null) {
return null;
}
Expand All @@ -195,10 +259,21 @@ public void shutdown(long timeout) {
}
}

if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
this.distributedSchedulerExecutor.shutdown();
try {
terminated = this.distributedSchedulerExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.taskExecutor.isShutdown()) {
this.taskExecutor.shutdown();
try {
terminated = this.taskExecutor.awaitTermination(timeout, unit);
terminated = this.taskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -217,7 +292,38 @@ public void shutdown(long timeout) {
if (terminated && !this.taskDbExecutor.isShutdown()) {
this.taskDbExecutor.shutdown();
try {
terminated = this.taskDbExecutor.awaitTermination(timeout, unit);
terminated = this.taskDbExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
this.ephemeralTaskExecutor.shutdown();
try {
terminated = this.ephemeralTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.schemaTaskExecutor.isShutdown()) {
this.schemaTaskExecutor.shutdown();
try {
terminated = this.schemaTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.olapTaskExecutor.isShutdown()) {
this.olapTaskExecutor.shutdown();
try {
terminated = this.olapTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
Expand Down Expand Up @@ -292,7 +398,7 @@ private void scheduleOrExecuteJob() {
// Called by scheduler timer
try {
for (TaskScheduler entry : this.schedulers.values()) {
StandardTaskScheduler scheduler = (StandardTaskScheduler) entry;
TaskScheduler scheduler = entry;
// Maybe other thread close&remove scheduler at the same time
synchronized (scheduler) {
this.scheduleOrExecuteJobForGraph(scheduler);
Expand All @@ -303,56 +409,59 @@ private void scheduleOrExecuteJob() {
}
}

private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) {
private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) {
E.checkNotNull(scheduler, "scheduler");

ServerInfoManager serverManager = scheduler.serverManager();
String graph = scheduler.graphName();

LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
try {
/*
* Skip if:
* graph is closed (iterate schedulers before graph is closing)
* or
* graph is not initialized(maybe truncated or cleared).
*
* If graph is closing by other thread, current thread get
* serverManager and try lock graph, at the same time other
* thread deleted the lock-group, current thread would get
* exception 'LockGroup xx does not exists'.
* If graph is closed, don't call serverManager.initialized()
* due to it will reopen graph tx.
*/
if (!serverManager.graphIsReady()) {
return;
}

// Update server heartbeat
serverManager.heartbeat();
if (scheduler instanceof StandardTaskScheduler) {
StandardTaskScheduler standardTaskScheduler = (StandardTaskScheduler) (scheduler);
ServerInfoManager serverManager = scheduler.serverManager();
String graph = scheduler.graphName();

/*
* Master will schedule tasks to suitable servers.
* Note a Worker may become to a Master, so elected-Master also needs to
* execute tasks assigned by previous Master when enableRoleElected=true.
* However, when enableRoleElected=false, a Master is only set by the
* config assignment, assigned-Master always stays the same state.
*/
if (serverManager.selfIsMaster()) {
scheduler.scheduleTasksOnMaster();
if (!this.enableRoleElected && !serverManager.onlySingleNode()) {
// assigned-Master + non-single-node don't need to execute tasks
LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
try {
/*
* Skip if:
* graph is closed (iterate schedulers before graph is closing)
* or
* graph is not initialized(maybe truncated or cleared).
*
* If graph is closing by other thread, current thread get
* serverManager and try lock graph, at the same time other
* thread deleted the lock-group, current thread would get
* exception 'LockGroup xx does not exists'.
* If graph is closed, don't call serverManager.initialized()
* due to it will reopen graph tx.
*/
if (!serverManager.graphIsReady()) {
return;
}
}

// Execute queued tasks scheduled to current server
scheduler.executeTasksOnWorker(serverManager.selfNodeId());
// Update server heartbeat
serverManager.heartbeat();

/*
* Master will schedule tasks to suitable servers.
* Note a Worker may become to a Master, so elected-Master also needs to
* execute tasks assigned by previous Master when enableRoleElected=true.
* However, when enableRoleElected=false, a Master is only set by the
* config assignment, assigned-Master always stays the same state.
*/
if (serverManager.selfIsMaster()) {
standardTaskScheduler.scheduleTasksOnMaster();
if (!this.enableRoleElected && !serverManager.onlySingleNode()) {
// assigned-Master + non-single-node don't need to execute tasks
return;
}
}

// Cancel tasks scheduled to current server
scheduler.cancelTasksOnWorker(serverManager.selfNodeId());
} finally {
LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
// Execute queued tasks scheduled to current server
standardTaskScheduler.executeTasksOnWorker(serverManager.selfNodeId());

// Cancel tasks scheduled to current server
standardTaskScheduler.cancelTasksOnWorker(serverManager.selfNodeId());
} finally {
LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
}
}
}

Expand Down
Loading