Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server): optimize the server-node info #2671

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,7 @@ private void initRoleStateMachine(Id serverId) {
conf.get(
RoleElectionOptions.BASE_TIMEOUT_MILLISECOND));
ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig,
roleStore);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, roleStore);
}

@Override
Expand Down Expand Up @@ -1007,7 +1006,7 @@ public void create(String configPath, GlobalMasterInfo nodeInfo) {
this.initBackend();
this.serverStarted(nodeInfo);

// Write config to disk file
// Write config to the disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
Expand Down Expand Up @@ -1349,7 +1348,7 @@ public String schedulerType() {

private class TinkerPopTransaction extends AbstractThreadLocalTransaction {

// Times opened from upper layer
// Times opened from the upper layer
private final AtomicInteger refs;
// Flag opened of each thread
private final ThreadLocal<Boolean> opened;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -47,14 +46,13 @@
public class HugeServerInfo {

// Unit millisecond
private static final long EXPIRED_INTERVAL =
TaskManager.SCHEDULE_PERIOD * 10;
private static final long EXPIRED_INTERVAL = TaskManager.SCHEDULE_PERIOD * 10;

private Id id;
private NodeRole role;
private Date updateTime;
private int maxLoad;
private int load;
private Date updateTime;
private final Id id;

private transient boolean updated = false;

Expand Down Expand Up @@ -114,6 +112,10 @@
this.updated = true;
}

public long expireTime() {
return this.updateTime.getTime() + EXPIRED_INTERVAL;

Check warning on line 116 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java#L116

Added line #L116 was not covered by tests
}

public Date updateTime() {
return this.updateTime;
}
Expand Down Expand Up @@ -200,8 +202,7 @@

public static HugeServerInfo fromVertex(Vertex vertex) {
HugeServerInfo serverInfo = new HugeServerInfo((Id) vertex.id());
for (Iterator<VertexProperty<Object>> iter = vertex.properties();
iter.hasNext(); ) {
for (var iter = vertex.properties(); iter.hasNext(); ) {
VertexProperty<Object> prop = iter.next();
serverInfo.property(prop.key(), prop.value());
}
Expand Down Expand Up @@ -246,7 +247,7 @@

public static final String SERVER = P.SERVER;

protected final HugeGraphParams graph;
private final HugeGraphParams graph;

public Schema(HugeGraphParams graph) {
this.graph = graph;
Expand All @@ -264,16 +265,14 @@
VertexLabel label = graph.schema().vertexLabel(SERVER)
.properties(properties)
.useCustomizeStringId()
.nullableKeys(P.ROLE, P.MAX_LOAD,
P.LOAD, P.UPDATE_TIME)
.nullableKeys(P.ROLE, P.MAX_LOAD, P.LOAD, P.UPDATE_TIME)
.enableLabelIndex(true)
.build();
this.graph.schemaTransaction().addVertexLabel(label);
}

private String[] initProperties() {
List<String> props = new ArrayList<>();

props.add(createPropertyKey(P.ROLE, DataType.BYTE));
props.add(createPropertyKey(P.MAX_LOAD, DataType.INT));
props.add(createPropertyKey(P.LOAD, DataType.INT));
Expand All @@ -283,8 +282,7 @@
}

public boolean existVertexLabel(String label) {
return this.graph.schemaTransaction()
.getVertexLabel(label) != null;
return this.graph.schemaTransaction().getVertexLabel(label) != null;
}

@SuppressWarnings("unused")
Expand All @@ -296,8 +294,7 @@
return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
}

private String createPropertyKey(String name, DataType dataType,
Cardinality cardinality) {
private String createPropertyKey(String name, DataType dataType, Cardinality cardinality) {
SchemaManager schema = this.graph.graph().schema();
PropertyKey propertyKey = schema.propertyKey(name)
.dataType(dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@
private volatile boolean onlySingleNode;
private volatile boolean closed;

public ServerInfoManager(HugeGraphParams graph,
ExecutorService dbExecutor) {
public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) {
E.checkNotNull(graph, "graph");
E.checkNotNull(dbExecutor, "db executor");

Expand Down Expand Up @@ -107,9 +106,20 @@

Id serverId = nodeInfo.nodeId();
HugeServerInfo existed = this.serverInfo(serverId);
if (existed != null && existed.alive()) {
final long now = DateUtil.now().getTime();

Check warning on line 110 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L110

Added line #L110 was not covered by tests
if (existed.expireTime() > now + 30 * 1000) {
LOG.info("The node time maybe skew very much: {}", existed);
throw new HugeException("The server with name '%s' maybe skew very much", serverId);

Check warning on line 113 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L112-L113

Added lines #L112 - L113 were not covered by tests
}
try {
Thread.sleep(existed.expireTime() - now + 1);
} catch (InterruptedException e) {
throw new HugeException("Interrupted when waiting for server info expired", e);
}

Check warning on line 119 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L116-L119

Added lines #L116 - L119 were not covered by tests
}
E.checkArgument(existed == null || !existed.alive(),
"The server with name '%s' already in cluster",
serverId);
"The server with name '%s' already in cluster", serverId);

if (nodeInfo.nodeRole().master()) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
Expand Down Expand Up @@ -185,13 +195,12 @@
/* ServerInfo is missing */
if (this.selfNodeId() == null) {
// Ignore if ServerInfo is not initialized
LOG.info("ServerInfo is missing: {}, may not be initialized yet");
LOG.info("ServerInfo is missing: {}, may not be initialized yet", this.selfNodeId());

Check warning on line 198 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L198

Added line #L198 was not covered by tests
return;
}
if (this.selfIsMaster()) {
// On master node, just wait for ServerInfo re-init
LOG.warn("ServerInfo is missing: {}, may be cleared before",
this.selfNodeId());
// On the master node, just wait for ServerInfo re-init
LOG.warn("ServerInfo is missing: {}, may be cleared before", this.selfNodeId());
return;
}
/*
Expand Down Expand Up @@ -232,12 +241,10 @@
if (!server.alive()) {
continue;
}

if (server.role().master()) {
master = server;
continue;
}

hasWorkerNode = true;
if (!server.suitableFor(task, now)) {
continue;
Expand All @@ -254,13 +261,12 @@
this.onlySingleNode = singleNode;
}

// Only schedule to master if there is no workers and master is suitable
// Only schedule to master if there are no workers and master are suitable
if (!hasWorkerNode) {
if (master != null && master.suitableFor(task, now)) {
serverWithMinLoad = master;
}
}

return serverWithMinLoad;
}

Expand All @@ -286,8 +292,7 @@
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, serverInfo);
}
HugeVertex vertex = this.tx().constructVertex(false,
serverInfo.asArray());
HugeVertex vertex = this.tx().constructVertex(false, serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
Expand All @@ -301,8 +306,7 @@
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s",
HugeServerInfo.P.SERVER);
throw new HugeException("Schema is missing for %s", HugeServerInfo.P.SERVER);

Check warning on line 309 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L309

Added line #L309 was not covered by tests
}
// Save server info in batch
GraphTransaction tx = this.tx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
if (this.taskTx == null) {
BackendStore store = this.graph.loadSystemStore();
TaskTransaction tx = new TaskTransaction(this.graph, store);
assert this.taskTx == null; // may be reentrant?
assert this.taskTx == null; // maybe reentrant?
this.taskTx = tx;
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@

if (this.serverManager().onlySingleNode() && !task.computer()) {
/*
* Speed up for single node, submit task immediately,
* Speed up for single node, submit the task immediately,
* this code can be removed without affecting code logic
*/
task.status(TaskStatus.QUEUED);
Expand All @@ -205,7 +205,7 @@
return this.submitTask(task);
} else {
/*
* Just set SCHEDULING status and save task,
* Just set the SCHEDULING status and save the task,
* it will be scheduled by periodic scheduler worker
*/
task.status(TaskStatus.SCHEDULING);
Expand Down Expand Up @@ -276,11 +276,11 @@
assert this.serverManager().selfIsMaster();
if (!task.server().equals(this.serverManager().selfNodeId())) {
/*
* Remove task from memory if it's running on worker node,
* but keep task in memory if it's running on master node.
* cancel-scheduling will read task from backend store, if
* Remove the task from memory if it's running on worker node,
* but keep the task in memory if it's running on master node.
* Cancel-scheduling will read the task from backend store, if
* removed this instance from memory, there will be two task
* instances with same id, and can't cancel the real task that
* instances with the same id, and can't cancel the real task that
* is running but removed from memory.
*/
this.remove(task);
Expand All @@ -301,12 +301,10 @@

protected synchronized void scheduleTasksOnMaster() {
// Master server schedule all scheduling tasks to suitable worker nodes
Collection<HugeServerInfo> serverInfos = this.serverManager()
.allServerInfos();
Collection<HugeServerInfo> serverInfos = this.serverManager().allServerInfos();
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULING,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
if (task.server() != null) {
Expand All @@ -318,12 +316,10 @@
return;
}

HugeServerInfo server = this.serverManager().pickWorkerNode(
serverInfos, task);
HugeServerInfo server = this.serverManager().pickWorkerNode(serverInfos, task);
if (server == null) {
LOG.info("The master can't find suitable servers to " +
"execute task '{}', wait for next schedule",
task.id());
"execute task '{}', wait for next schedule", task.id());

Check warning on line 322 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java#L322

Added line #L322 was not covered by tests
continue;
}

Expand All @@ -336,8 +332,7 @@
// Update server load in memory, it will be saved at the ending
server.increaseLoad(task.load());

LOG.info("Scheduled task '{}' to server '{}'",
task.id(), server.id());
LOG.info("Scheduled task '{}' to server '{}'", task.id(), server.id());
}
if (page != null) {
page = PageInfo.pageInfo(tasks);
Expand All @@ -351,8 +346,7 @@
protected void executeTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
this.initTaskCallable(task);
Expand Down Expand Up @@ -381,8 +375,7 @@
protected void cancelTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.CANCELLING,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
Id taskServer = task.server();
Expand Down Expand Up @@ -557,10 +550,10 @@

HugeTask<?> task = this.task(id);
/*
* The following is out of date when task running on worker node:
* The following is out of date when the task running on worker node:
* HugeTask<?> task = this.tasks.get(id);
* Tasks are removed from memory after completed at most time,
* but there is a tiny gap between tasks are completed and
* but there is a tiny gap between tasks is completed and
* removed from memory.
* We assume tasks only in memory may be incomplete status,
* in fact, it is also possible to appear on the backend tasks
Expand Down Expand Up @@ -621,7 +614,7 @@
throw e;
}
if (task.completed()) {
// Wait for task result being set after status is completed
// Wait for the task result being set after the status is completed
sleep(intervalMs);
return task;
}
Expand Down
Loading